From 0ec52abc981772f547f58083e191594aa0f078a7 Mon Sep 17 00:00:00 2001 From: Marcin Czenko Date: Mon, 31 Mar 2025 06:48:22 +0200 Subject: [PATCH 1/3] fixes RandomChunker not respecting padding (#1170) --- tests/codex/helpers/randomchunker.nim | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/codex/helpers/randomchunker.nim b/tests/codex/helpers/randomchunker.nim index cf857595..d1383e84 100644 --- a/tests/codex/helpers/randomchunker.nim +++ b/tests/codex/helpers/randomchunker.nim @@ -33,10 +33,10 @@ proc new*( return 0 var read = 0 - while read < len: + while read < len and (pad or read < size - consumed): rng.shuffle(alpha) for a in alpha: - if read >= len: + if read >= len or (not pad and read >= size - consumed): break data[read] = a From 5ec3b2b0275046cbf4a4743b18714e49e54adbfc Mon Sep 17 00:00:00 2001 From: Marcin Czenko Date: Mon, 31 Mar 2025 06:57:55 +0200 Subject: [PATCH 2/3] make sure we do not call "get" on unverified Result while fetching in batches (#1169) * makes sure we do not call "get" on unverified result * make handling of failed blocks in fetchBatched even more explicit * simplifies allFinishedValues and makes it independent from allFinishedFailed * only sleep if not iter.finished in fetchBatched --- codex/errors.nim | 29 ++++++++++++++++++++++++++++- codex/node.nim | 22 ++++++++++++++-------- tests/codex/node/testnode.nim | 26 ++++++++++++++++++++++++++ 3 files changed, 68 insertions(+), 9 deletions(-) diff --git a/codex/errors.nim b/codex/errors.nim index fadf7299..1a571e0f 100644 --- a/codex/errors.nim +++ b/codex/errors.nim @@ -8,6 +8,8 @@ ## those terms. import std/options +import std/sugar +import std/sequtils import pkg/results import pkg/chronos @@ -42,7 +44,9 @@ func toFailure*[T](exp: Option[T]): Result[T, ref CatchableError] {.inline.} = else: T.failure("Option is None") -proc allFinishedFailed*[T](futs: seq[Future[T]]): Future[FinishedFailed[T]] {.async.} = +proc allFinishedFailed*[T]( + futs: seq[Future[T]] +): Future[FinishedFailed[T]] {.async: (raises: [CancelledError]).} = ## Check if all futures have finished or failed ## ## TODO: wip, not sure if we want this - at the minimum, @@ -57,3 +61,26 @@ proc allFinishedFailed*[T](futs: seq[Future[T]]): Future[FinishedFailed[T]] {.as res.success.add f return res + +proc allFinishedValues*[T]( + futs: seq[Future[T]] +): Future[?!seq[T]] {.async: (raises: [CancelledError]).} = + ## If all futures have finished, return corresponding values, + ## otherwise return failure + ## + + # wait for all futures to be either completed, failed or canceled + await allFutures(futs) + + let numOfFailed = futs.countIt(it.failed) + + if numOfFailed > 0: + return failure "Some futures failed (" & $numOfFailed & "))" + + # here, we know there are no failed futures in "futs" + # and we are only interested in those that completed successfully + let values = collect: + for b in futs: + if b.finished: + b.value + return success values diff --git a/codex/node.nim b/codex/node.nim index 9932deb6..fb653c0d 100644 --- a/codex/node.nim +++ b/codex/node.nim @@ -183,23 +183,29 @@ proc fetchBatched*( # ) while not iter.finished: - let blocks = collect: + let blockFutures = collect: for i in 0 ..< batchSize: if not iter.finished: let address = BlockAddress.init(cid, iter.next()) if not (await address in self.networkStore) or fetchLocal: self.networkStore.getBlock(address) - let res = await allFinishedFailed(blocks) - if res.failure.len > 0: - trace "Some blocks failed to fetch", len = res.failure.len - return failure("Some blocks failed to fetch (" & $res.failure.len & " )") + without blockResults =? await allFinishedValues(blockFutures), err: + trace "Some blocks failed to fetch", err = err.msg + return failure(err) - if not onBatch.isNil and - batchErr =? (await onBatch(blocks.mapIt(it.read.get))).errorOption: + let blocks = blockResults.filterIt(it.isSuccess()).mapIt(it.value) + + let numOfFailedBlocks = blockResults.len - blocks.len + if numOfFailedBlocks > 0: + return + failure("Some blocks failed (Result) to fetch (" & $numOfFailedBlocks & ")") + + if not onBatch.isNil and batchErr =? (await onBatch(blocks)).errorOption: return failure(batchErr) - await sleepAsync(1.millis) + if not iter.finished: + await sleepAsync(1.millis) success() diff --git a/tests/codex/node/testnode.nim b/tests/codex/node/testnode.nim index 511badef..bd535336 100644 --- a/tests/codex/node/testnode.nim +++ b/tests/codex/node/testnode.nim @@ -30,6 +30,7 @@ import pkg/codex/discovery import pkg/codex/erasure import pkg/codex/merkletree import pkg/codex/blocktype as bt +import pkg/codex/rng import pkg/codex/node {.all.} @@ -78,6 +79,31 @@ asyncchecksuite "Test Node - Basic": ) ).tryGet() + test "Block Batching with corrupted blocks": + let blocks = await makeRandomBlocks(datasetSize = 64.KiBs.int, blockSize = 64.KiBs) + assert blocks.len == 1 + + let blk = blocks[0] + + # corrupt block + let pos = rng.Rng.instance.rand(blk.data.len - 1) + blk.data[pos] = byte 0 + + let manifest = await storeDataGetManifest(localStore, blocks) + + let batchSize = manifest.blocksCount + let res = ( + await node.fetchBatched( + manifest, + batchSize = batchSize, + proc(blocks: seq[bt.Block]): Future[?!void] {.gcsafe, async.} = + return failure("Should not be called"), + ) + ) + check res.isFailure + check res.error of CatchableError + check res.error.msg == "Some blocks failed (Result) to fetch (1)" + test "Should store Data Stream": let stream = BufferStream.new() From e9c6d198732874203755c473363b624562725df8 Mon Sep 17 00:00:00 2001 From: munna0908 <88337208+munna0908@users.noreply.github.com> Date: Mon, 31 Mar 2025 12:11:08 +0530 Subject: [PATCH 3/3] use constantine sha256 for codex tree hashing (#1168) --- codex/merkletree/codex/codex.nim | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/codex/merkletree/codex/codex.nim b/codex/merkletree/codex/codex.nim index e287dfac..0eec92e4 100644 --- a/codex/merkletree/codex/codex.nim +++ b/codex/merkletree/codex/codex.nim @@ -15,7 +15,7 @@ import std/sequtils import pkg/questionable import pkg/questionable/results import pkg/libp2p/[cid, multicodec, multihash] - +import pkg/constantine/hashes import ../../utils import ../../rng import ../../errors @@ -132,9 +132,13 @@ func compress*(x, y: openArray[byte], key: ByteTreeKey, mhash: MHash): ?!ByteHas ## Compress two hashes ## - var digest = newSeq[byte](mhash.size) - mhash.coder(@x & @y & @[key.byte], digest) - success digest + # Using Constantine's SHA256 instead of mhash for optimal performance on 32-byte merkle node hashing + # See: https://github.com/codex-storage/nim-codex/issues/1162 + + let input = @x & @y & @[key.byte] + var digest = hashes.sha256.hash(input) + + success @digest func init*( _: type CodexTree, mcodec: MultiCodec = Sha256HashCodec, leaves: openArray[ByteHash]