From c7bc28d7232de827cd5414c99e1fb654e14a8ff4 Mon Sep 17 00:00:00 2001 From: Ben Bierens <39762930+benbierens@users.noreply.github.com> Date: Tue, 30 Apr 2024 11:31:06 +0200 Subject: [PATCH 1/2] Reduce logging during file upload (#792) * Removes warning * Updates logging for file upload * Restores trace for placing block and proof in repo store --- codex/blockexchange/engine/discovery.nim | 20 +++++++------------- codex/blockexchange/engine/engine.nim | 10 +++------- codex/blockexchange/engine/pendingblocks.nim | 2 -- codex/blockexchange/network/network.nim | 3 --- codex/discovery.nim | 5 +---- codex/node.nim | 9 +++------ codex/stores/networkstore.nim | 3 --- codex/stores/repostore.nim | 17 +++++++---------- 8 files changed, 21 insertions(+), 48 deletions(-) diff --git a/codex/blockexchange/engine/discovery.nim b/codex/blockexchange/engine/discovery.nim index 267d8f12..5434a5e6 100644 --- a/codex/blockexchange/engine/discovery.nim +++ b/codex/blockexchange/engine/discovery.nim @@ -68,13 +68,12 @@ proc discoveryQueueLoop(b: DiscoveryEngine) {.async.} = try: await b.discoveryQueue.put(cid) except CatchableError as exc: - trace "Exception in discovery loop", exc = exc.msg + warn "Exception in discovery loop", exc = exc.msg logScope: sleep = b.discoveryLoopSleep wanted = b.pendingBlocks.len - trace "About to sleep discovery loop" await sleepAsync(b.discoveryLoopSleep) proc advertiseQueueLoop(b: DiscoveryEngine) {.async.} = @@ -87,10 +86,9 @@ proc advertiseQueueLoop(b: DiscoveryEngine) {.async.} = await sleepAsync(50.millis) trace "Iterating blocks finished." - trace "About to sleep advertise loop", sleep = b.advertiseLoopSleep await sleepAsync(b.advertiseLoopSleep) - trace "Exiting advertise task loop" + info "Exiting advertise task loop" proc advertiseTaskLoop(b: DiscoveryEngine) {.async.} = ## Run advertise tasks @@ -102,7 +100,6 @@ proc advertiseTaskLoop(b: DiscoveryEngine) {.async.} = cid = await b.advertiseQueue.get() if cid in b.inFlightAdvReqs: - trace "Advertise request already in progress", cid continue try: @@ -111,17 +108,15 @@ proc advertiseTaskLoop(b: DiscoveryEngine) {.async.} = b.inFlightAdvReqs[cid] = request codexInflightDiscovery.set(b.inFlightAdvReqs.len.int64) - trace "Advertising block", cid, inflight = b.inFlightAdvReqs.len await request finally: b.inFlightAdvReqs.del(cid) codexInflightDiscovery.set(b.inFlightAdvReqs.len.int64) - trace "Advertised block", cid, inflight = b.inFlightAdvReqs.len except CatchableError as exc: - trace "Exception in advertise task runner", exc = exc.msg + warn "Exception in advertise task runner", exc = exc.msg - trace "Exiting advertise task runner" + info "Exiting advertise task runner" proc discoveryTaskLoop(b: DiscoveryEngine) {.async.} = ## Run discovery tasks @@ -166,9 +161,9 @@ proc discoveryTaskLoop(b: DiscoveryEngine) {.async.} = b.inFlightDiscReqs.del(cid) codexInflightDiscovery.set(b.inFlightAdvReqs.len.int64) except CatchableError as exc: - trace "Exception in discovery task runner", exc = exc.msg + warn "Exception in discovery task runner", exc = exc.msg - trace "Exiting discovery task runner" + info "Exiting discovery task runner" proc queueFindBlocksReq*(b: DiscoveryEngine, cids: seq[Cid]) {.inline.} = for cid in cids: @@ -183,10 +178,9 @@ proc queueProvideBlocksReq*(b: DiscoveryEngine, cids: seq[Cid]) {.inline.} = for cid in cids: if cid notin b.advertiseQueue: try: - trace "Queueing provide block", cid, queue = b.discoveryQueue.len b.advertiseQueue.putNoWait(cid) except CatchableError as exc: - trace "Exception queueing discovery request", exc = exc.msg + warn "Exception queueing discovery request", exc = exc.msg proc start*(b: DiscoveryEngine) {.async.} = ## Start the discengine task diff --git a/codex/blockexchange/engine/engine.nim b/codex/blockexchange/engine/engine.nim index 291eea14..a4565b10 100644 --- a/codex/blockexchange/engine/engine.nim +++ b/codex/blockexchange/engine/engine.nim @@ -262,8 +262,6 @@ proc blockPresenceHandler*( not b.peers.anyIt( cid in it.peerHaveCids )) proc scheduleTasks(b: BlockExcEngine, blocksDelivery: seq[BlockDelivery]) {.async.} = - trace "Schedule a task for new blocks", items = blocksDelivery.len - let cids = blocksDelivery.mapIt( it.blk.cid ) @@ -277,7 +275,7 @@ proc scheduleTasks(b: BlockExcEngine, blocksDelivery: seq[BlockDelivery]) {.asyn if b.scheduleTask(p): trace "Task scheduled for peer", peer = p.id else: - trace "Unable to schedule task for peer", peer = p.id + warn "Unable to schedule task for peer", peer = p.id break # do next peer @@ -293,7 +291,7 @@ proc cancelBlocks(b: BlockExcEngine, addrs: seq[BlockAddress]) {.async.} = .filterIt(it.failed) if failed.len > 0: - trace "Failed to send block request cancellations to peers", peers = failed.len + warn "Failed to send block request cancellations to peers", peers = failed.len proc getAnnouceCids(blocksDelivery: seq[BlockDelivery]): seq[Cid] = var cids = initHashSet[Cid]() @@ -309,8 +307,6 @@ proc getAnnouceCids(blocksDelivery: seq[BlockDelivery]): seq[Cid] = return cids.toSeq proc resolveBlocks*(b: BlockExcEngine, blocksDelivery: seq[BlockDelivery]) {.async.} = - trace "Resolving blocks", blocks = blocksDelivery.len - b.pendingBlocks.resolve(blocksDelivery) await b.scheduleTasks(blocksDelivery) let announceCids = getAnnouceCids(blocksDelivery) @@ -618,7 +614,7 @@ proc blockexcTaskRunner(b: BlockExcEngine) {.async.} = trace "Got new task from queue", peerId = peerCtx.id await b.taskHandler(peerCtx) - trace "Exiting blockexc task runner" + info "Exiting blockexc task runner" proc new*( T: type BlockExcEngine, diff --git a/codex/blockexchange/engine/pendingblocks.nim b/codex/blockexchange/engine/pendingblocks.nim index 55a9da4c..0f8cd439 100644 --- a/codex/blockexchange/engine/pendingblocks.nim +++ b/codex/blockexchange/engine/pendingblocks.nim @@ -102,8 +102,6 @@ proc resolve*( trace "Block retrieval time", retrievalDurationUs, address = bd.address else: trace "Block handle already finished", address = bd.address - do: - warn "Attempting to resolve block that's not currently a pending block", address = bd.address proc setInFlight*( p: PendingBlocksManager, diff --git a/codex/blockexchange/network/network.nim b/codex/blockexchange/network/network.nim index 336e627a..d193779d 100644 --- a/codex/blockexchange/network/network.nim +++ b/codex/blockexchange/network/network.nim @@ -147,9 +147,6 @@ proc sendWantCancellations*( addresses: seq[BlockAddress]): Future[void] {.async.} = ## Informs a remote peer that we're no longer interested in a set of blocks ## - - trace "Sending block request cancellation to peer", addrs = addresses.len, peer = id - await b.sendWantList(id = id, addresses = addresses, cancel = true) proc handleBlocksDelivery( diff --git a/codex/discovery.nim b/codex/discovery.nim index af269324..d3cefe05 100644 --- a/codex/discovery.nim +++ b/codex/discovery.nim @@ -84,16 +84,13 @@ method find*( method provide*(d: Discovery, cid: Cid) {.async, base.} = ## Provide a bock Cid ## - - trace "Providing block", cid let nodes = await d.protocol.addProvider( cid.toNodeId(), d.providerRecord.get) if nodes.len <= 0: - trace "Couldn't provide to any nodes!" + warn "Couldn't provide to any nodes!" - trace "Provided to nodes", nodes = nodes.len method find*( d: Discovery, diff --git a/codex/node.nim b/codex/node.nim index effde9cf..f7dffb00 100644 --- a/codex/node.nim +++ b/codex/node.nim @@ -294,7 +294,7 @@ proc store*( ## Save stream contents as dataset with given blockSize ## to nodes's BlockStore, and return Cid of its manifest ## - trace "Storing data" + info "Storing data" let hcodec = Sha256HashCodec @@ -308,8 +308,6 @@ proc store*( let chunk = await chunker.getBytes(); chunk.len > 0): - trace "Got data from stream", len = chunk.len - without mhash =? MultiHash.digest($hcodec, chunk).mapFailure, err: return failure(err) @@ -322,7 +320,7 @@ proc store*( cids.add(cid) if err =? (await self.networkStore.putBlock(blk)).errorOption: - trace "Unable to store block", cid = blk.cid, err = err.msg + error "Unable to store block", cid = blk.cid, err = err.msg return failure(&"Unable to store block {blk.cid}") except CancelledError as exc: raise exc @@ -353,7 +351,7 @@ proc store*( codec = dataCodec) without manifestBlk =? await self.storeManifest(manifest), err: - trace "Unable to store manifest" + error "Unable to store manifest" return failure(err) info "Stored data", manifestCid = manifestBlk.cid, @@ -361,7 +359,6 @@ proc store*( blocks = manifest.blocksCount, datasetSize = manifest.datasetSize - # Announce manifest await self.discovery.provide(manifestBlk.cid) await self.discovery.provide(treeCid) diff --git a/codex/stores/networkstore.nim b/codex/stores/networkstore.nim index 39835e23..ce0761d6 100644 --- a/codex/stores/networkstore.nim +++ b/codex/stores/networkstore.nim @@ -69,9 +69,6 @@ method putBlock*( ttl = Duration.none): Future[?!void] {.async.} = ## Store block locally and notify the network ## - - trace "Putting block into network store", cid = blk.cid - let res = await self.localStore.putBlock(blk, ttl) if res.isErr: return res diff --git a/codex/stores/repostore.nim b/codex/stores/repostore.nim index dc63f4cd..5cabc726 100644 --- a/codex/stores/repostore.nim +++ b/codex/stores/repostore.nim @@ -119,7 +119,7 @@ method putCidAndProof*( without key =? createBlockCidAndProofMetadataKey(treeCid, index), err: return failure(err) - trace "Storing block cid and proof with key", key + trace "Storing block cid and proof", blockCid, key let value = (blockCid, proof).encode() @@ -313,7 +313,7 @@ method putBlock*( return success() without key =? makePrefixKey(self.postFixLen, blk.cid), err: - trace "Error getting key from provider", err = err.msg + warn "Error getting key from provider", err = err.msg return failure(err) if await key in self.repoDs: @@ -325,8 +325,6 @@ method putBlock*( return failure( newException(QuotaUsedError, "Cannot store block, quota used!")) - trace "Storing block with key", key - var batch: seq[BatchEntry] @@ -334,22 +332,21 @@ method putBlock*( used = self.quotaUsedBytes + blk.data.len.uint if err =? (await self.repoDs.put(key, blk.data)).errorOption: - trace "Error storing block", err = err.msg + error "Error storing block", err = err.msg return failure(err) - trace "Updating quota", used batch.add((QuotaUsedKey, @(used.uint64.toBytesBE))) without blockExpEntry =? self.getBlockExpirationEntry(blk.cid, ttl), err: - trace "Unable to create block expiration metadata key", err = err.msg + warn "Unable to create block expiration metadata key", err = err.msg return failure(err) batch.add(blockExpEntry) if err =? (await self.metaDs.put(batch)).errorOption: - trace "Error updating quota bytes", err = err.msg + error "Error updating quota bytes", err = err.msg if err =? (await self.repoDs.delete(key)).errorOption: - trace "Error deleting block after failed quota update", err = err.msg + error "Error deleting block after failed quota update", err = err.msg return failure(err) return failure(err) @@ -357,7 +354,7 @@ method putBlock*( self.quotaUsedBytes = used inc self.totalBlocks if isErr (await self.persistTotalBlocksCount()): - trace "Unable to update block total metadata" + warn "Unable to update block total metadata" return failure("Unable to update block total metadata") self.updateMetrics() From c58d4d7dbe9dd4663391acbfbdc2985735067bee Mon Sep 17 00:00:00 2001 From: Ben Bierens <39762930+benbierens@users.noreply.github.com> Date: Tue, 30 Apr 2024 17:27:17 +0200 Subject: [PATCH 2/2] Bug: Large manifests encoding (#800) * Adds test for encoding/decoding manifests with large datasets * switches datasetSize to uint64 --- codex/manifest/coders.nim | 8 ++++---- tests/codex/testmanifest.nim | 10 ++++++++++ 2 files changed, 14 insertions(+), 4 deletions(-) diff --git a/codex/manifest/coders.nim b/codex/manifest/coders.nim index 2e824e4d..e36039c7 100644 --- a/codex/manifest/coders.nim +++ b/codex/manifest/coders.nim @@ -66,7 +66,7 @@ proc encode*(manifest: Manifest): ?!seq[byte] = var header = initProtoBuffer() header.write(1, manifest.treeCid.data.buffer) header.write(2, manifest.blockSize.uint32) - header.write(3, manifest.datasetSize.uint32) + header.write(3, manifest.datasetSize.uint64) header.write(4, manifest.codec.uint32) header.write(5, manifest.hcodec.uint32) header.write(6, manifest.version.uint32) @@ -75,7 +75,7 @@ proc encode*(manifest: Manifest): ?!seq[byte] = erasureInfo.write(1, manifest.ecK.uint32) erasureInfo.write(2, manifest.ecM.uint32) erasureInfo.write(3, manifest.originalTreeCid.data.buffer) - erasureInfo.write(4, manifest.originalDatasetSize.uint32) + erasureInfo.write(4, manifest.originalDatasetSize.uint64) erasureInfo.write(5, manifest.protectedStrategy.uint32) if manifest.verifiable: @@ -106,12 +106,12 @@ proc decode*(_: type Manifest, data: openArray[byte]): ?!Manifest = pbVerificationInfo: ProtoBuffer treeCidBuf: seq[byte] originalTreeCid: seq[byte] - datasetSize: uint32 + datasetSize: uint64 codec: uint32 hcodec: uint32 version: uint32 blockSize: uint32 - originalDatasetSize: uint32 + originalDatasetSize: uint64 ecK, ecM: uint32 protectedStrategy: uint32 verifyRoot: seq[byte] diff --git a/tests/codex/testmanifest.nim b/tests/codex/testmanifest.nim index 10fb7772..188cc957 100644 --- a/tests/codex/testmanifest.nim +++ b/tests/codex/testmanifest.nim @@ -57,6 +57,16 @@ checksuite "Manifest": check: encodeDecode(manifest) == manifest + test "Should encode/decode large manifest": + let large = Manifest.new( + treeCid = Cid.example, + blockSize = (64 * 1024).NBytes, + datasetSize = (5 * 1024).MiBs + ) + + check: + encodeDecode(large) == large + test "Should encode/decode to/from protected manifest": check: encodeDecode(protectedManifest) == protectedManifest