Debug/extra logging (#509)
* logs * poll log * more logs * time log for upload steps * Adds metric and log for block retrieval time * adds some logging * move logging for storestream block indices * Log at start and end of block iteration * applies branch blockstore-bugfix * Cleanup * Cleanup
This commit is contained in:
parent
238771eb73
commit
f10c3f3934
|
@ -78,13 +78,15 @@ proc discoveryQueueLoop(b: DiscoveryEngine) {.async.} =
|
||||||
trace "About to sleep discovery loop"
|
trace "About to sleep discovery loop"
|
||||||
await sleepAsync(b.discoveryLoopSleep)
|
await sleepAsync(b.discoveryLoopSleep)
|
||||||
|
|
||||||
proc advertiseQueueLoop*(b: DiscoveryEngine) {.async.} =
|
proc advertiseQueueLoop(b: DiscoveryEngine) {.async.} =
|
||||||
while b.discEngineRunning:
|
while b.discEngineRunning:
|
||||||
if cids =? await b.localStore.listBlocks(blockType = b.advertiseType):
|
if cids =? await b.localStore.listBlocks(blockType = b.advertiseType):
|
||||||
|
trace "Begin iterating blocks..."
|
||||||
for c in cids:
|
for c in cids:
|
||||||
if cid =? await c:
|
if cid =? await c:
|
||||||
await b.advertiseQueue.put(cid)
|
await b.advertiseQueue.put(cid)
|
||||||
await sleepAsync(50.millis)
|
await sleepAsync(50.millis)
|
||||||
|
trace "Iterating blocks finished."
|
||||||
|
|
||||||
trace "About to sleep advertise loop", sleep = b.advertiseLoopSleep
|
trace "About to sleep advertise loop", sleep = b.advertiseLoopSleep
|
||||||
await sleepAsync(b.advertiseLoopSleep)
|
await sleepAsync(b.advertiseLoopSleep)
|
||||||
|
@ -256,7 +258,7 @@ proc new*(
|
||||||
advertiseType = BlockType.Both
|
advertiseType = BlockType.Both
|
||||||
): DiscoveryEngine =
|
): DiscoveryEngine =
|
||||||
## Create a discovery engine instance for advertising services
|
## Create a discovery engine instance for advertising services
|
||||||
##
|
##
|
||||||
DiscoveryEngine(
|
DiscoveryEngine(
|
||||||
localStore: localStore,
|
localStore: localStore,
|
||||||
peers: peers,
|
peers: peers,
|
||||||
|
|
|
@ -8,6 +8,7 @@
|
||||||
## those terms.
|
## those terms.
|
||||||
|
|
||||||
import std/tables
|
import std/tables
|
||||||
|
import std/monotimes
|
||||||
|
|
||||||
import pkg/upraises
|
import pkg/upraises
|
||||||
|
|
||||||
|
@ -24,6 +25,7 @@ logScope:
|
||||||
topics = "codex pendingblocks"
|
topics = "codex pendingblocks"
|
||||||
|
|
||||||
declareGauge(codexBlockExchangePendingBlockRequests, "codex blockexchange pending block requests")
|
declareGauge(codexBlockExchangePendingBlockRequests, "codex blockexchange pending block requests")
|
||||||
|
declareGauge(codexBlockExchangeRetrievalTimeUs, "codex blockexchange block retrieval time us")
|
||||||
|
|
||||||
const
|
const
|
||||||
DefaultBlockTimeout* = 10.minutes
|
DefaultBlockTimeout* = 10.minutes
|
||||||
|
@ -32,6 +34,7 @@ type
|
||||||
BlockReq* = object
|
BlockReq* = object
|
||||||
handle*: Future[Block]
|
handle*: Future[Block]
|
||||||
inFlight*: bool
|
inFlight*: bool
|
||||||
|
startTime*: int64
|
||||||
|
|
||||||
PendingBlocksManager* = ref object of RootObj
|
PendingBlocksManager* = ref object of RootObj
|
||||||
blocks*: Table[Cid, BlockReq] # pending Block requests
|
blocks*: Table[Cid, BlockReq] # pending Block requests
|
||||||
|
@ -52,7 +55,8 @@ proc getWantHandle*(
|
||||||
if cid notin p.blocks:
|
if cid notin p.blocks:
|
||||||
p.blocks[cid] = BlockReq(
|
p.blocks[cid] = BlockReq(
|
||||||
handle: newFuture[Block]("pendingBlocks.getWantHandle"),
|
handle: newFuture[Block]("pendingBlocks.getWantHandle"),
|
||||||
inFlight: inFlight)
|
inFlight: inFlight,
|
||||||
|
startTime: getMonoTime().ticks)
|
||||||
|
|
||||||
trace "Adding pending future for block", cid, inFlight = p.blocks[cid].inFlight
|
trace "Adding pending future for block", cid, inFlight = p.blocks[cid].inFlight
|
||||||
|
|
||||||
|
@ -80,6 +84,12 @@ proc resolve*(p: PendingBlocksManager,
|
||||||
if not pending[].handle.completed:
|
if not pending[].handle.completed:
|
||||||
trace "Resolving block", cid = blk.cid
|
trace "Resolving block", cid = blk.cid
|
||||||
pending[].handle.complete(blk)
|
pending[].handle.complete(blk)
|
||||||
|
let
|
||||||
|
startTime = pending[].startTime
|
||||||
|
stopTime = getMonoTime().ticks
|
||||||
|
retrievalDurationUs = (stopTime - startTime) div 1000
|
||||||
|
codexBlockExchangeRetrievalTimeUs.set(retrievalDurationUs)
|
||||||
|
trace "Block retrieval time", retrievalDurationUs
|
||||||
|
|
||||||
proc setInFlight*(p: PendingBlocksManager,
|
proc setInFlight*(p: PendingBlocksManager,
|
||||||
cid: Cid,
|
cid: Cid,
|
||||||
|
|
|
@ -227,7 +227,8 @@ proc store*(
|
||||||
|
|
||||||
trace "Stored data", manifestCid = manifest.cid,
|
trace "Stored data", manifestCid = manifest.cid,
|
||||||
contentCid = cid,
|
contentCid = cid,
|
||||||
blocks = blockManifest.len
|
blocks = blockManifest.len,
|
||||||
|
size=blockManifest.originalBytes
|
||||||
|
|
||||||
# Announce manifest
|
# Announce manifest
|
||||||
await self.discovery.provide(manifest.cid)
|
await self.discovery.provide(manifest.cid)
|
||||||
|
|
|
@ -247,13 +247,15 @@ proc initRestApi*(node: CodexNodeRef, conf: CodexConf): RestRouter =
|
||||||
trace "Uploaded file", cid
|
trace "Uploaded file", cid
|
||||||
return RestApiResponse.response($cid)
|
return RestApiResponse.response($cid)
|
||||||
except CancelledError:
|
except CancelledError:
|
||||||
|
trace "Upload cancelled error"
|
||||||
return RestApiResponse.error(Http500)
|
return RestApiResponse.error(Http500)
|
||||||
except AsyncStreamError:
|
except AsyncStreamError:
|
||||||
|
trace "Async stream error"
|
||||||
return RestApiResponse.error(Http500)
|
return RestApiResponse.error(Http500)
|
||||||
finally:
|
finally:
|
||||||
await reader.closeWait()
|
await reader.closeWait()
|
||||||
|
|
||||||
# if we got here something went wrong?
|
trace "Something went wrong error"
|
||||||
return RestApiResponse.error(Http500)
|
return RestApiResponse.error(Http500)
|
||||||
|
|
||||||
router.api(
|
router.api(
|
||||||
|
|
|
@ -52,7 +52,7 @@ proc new*(
|
||||||
pad = true
|
pad = true
|
||||||
): StoreStream =
|
): StoreStream =
|
||||||
## Create a new StoreStream instance for a given store and manifest
|
## Create a new StoreStream instance for a given store and manifest
|
||||||
##
|
##
|
||||||
result = StoreStream(
|
result = StoreStream(
|
||||||
store: store,
|
store: store,
|
||||||
manifest: manifest,
|
manifest: manifest,
|
||||||
|
@ -79,7 +79,7 @@ method readOnce*(
|
||||||
## Read `nbytes` from current position in the StoreStream into output buffer pointed by `pbytes`.
|
## Read `nbytes` from current position in the StoreStream into output buffer pointed by `pbytes`.
|
||||||
## Return how many bytes were actually read before EOF was encountered.
|
## Return how many bytes were actually read before EOF was encountered.
|
||||||
## Raise exception if we are already at EOF.
|
## Raise exception if we are already at EOF.
|
||||||
##
|
##
|
||||||
|
|
||||||
trace "Reading from manifest", cid = self.manifest.cid.get(), blocks = self.manifest.len
|
trace "Reading from manifest", cid = self.manifest.cid.get(), blocks = self.manifest.len
|
||||||
if self.atEof:
|
if self.atEof:
|
||||||
|
|
Loading…
Reference in New Issue