Merge branch 'master' into logging/repostore-timing

This commit is contained in:
Ben Bierens 2025-04-01 10:27:01 +02:00 committed by GitHub
commit d7912a1f9c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 78 additions and 15 deletions

View File

@ -8,6 +8,8 @@
## those terms.
import std/options
import std/sugar
import std/sequtils
import pkg/results
import pkg/chronos
@ -42,7 +44,9 @@ func toFailure*[T](exp: Option[T]): Result[T, ref CatchableError] {.inline.} =
else:
T.failure("Option is None")
proc allFinishedFailed*[T](futs: seq[Future[T]]): Future[FinishedFailed[T]] {.async.} =
proc allFinishedFailed*[T](
futs: seq[Future[T]]
): Future[FinishedFailed[T]] {.async: (raises: [CancelledError]).} =
## Check if all futures have finished or failed
##
## TODO: wip, not sure if we want this - at the minimum,
@ -57,3 +61,26 @@ proc allFinishedFailed*[T](futs: seq[Future[T]]): Future[FinishedFailed[T]] {.as
res.success.add f
return res
proc allFinishedValues*[T](
futs: seq[Future[T]]
): Future[?!seq[T]] {.async: (raises: [CancelledError]).} =
## If all futures have finished, return corresponding values,
## otherwise return failure
##
# wait for all futures to be either completed, failed or canceled
await allFutures(futs)
let numOfFailed = futs.countIt(it.failed)
if numOfFailed > 0:
return failure "Some futures failed (" & $numOfFailed & "))"
# here, we know there are no failed futures in "futs"
# and we are only interested in those that completed successfully
let values = collect:
for b in futs:
if b.finished:
b.value
return success values

View File

@ -15,7 +15,7 @@ import std/sequtils
import pkg/questionable
import pkg/questionable/results
import pkg/libp2p/[cid, multicodec, multihash]
import pkg/constantine/hashes
import ../../utils
import ../../rng
import ../../errors
@ -132,9 +132,13 @@ func compress*(x, y: openArray[byte], key: ByteTreeKey, mhash: MHash): ?!ByteHas
## Compress two hashes
##
var digest = newSeq[byte](mhash.size)
mhash.coder(@x & @y & @[key.byte], digest)
success digest
# Using Constantine's SHA256 instead of mhash for optimal performance on 32-byte merkle node hashing
# See: https://github.com/codex-storage/nim-codex/issues/1162
let input = @x & @y & @[key.byte]
var digest = hashes.sha256.hash(input)
success @digest
func init*(
_: type CodexTree, mcodec: MultiCodec = Sha256HashCodec, leaves: openArray[ByteHash]

View File

@ -183,23 +183,29 @@ proc fetchBatched*(
# )
while not iter.finished:
let blocks = collect:
let blockFutures = collect:
for i in 0 ..< batchSize:
if not iter.finished:
let address = BlockAddress.init(cid, iter.next())
if not (await address in self.networkStore) or fetchLocal:
self.networkStore.getBlock(address)
let res = await allFinishedFailed(blocks)
if res.failure.len > 0:
trace "Some blocks failed to fetch", len = res.failure.len
return failure("Some blocks failed to fetch (" & $res.failure.len & " )")
without blockResults =? await allFinishedValues(blockFutures), err:
trace "Some blocks failed to fetch", err = err.msg
return failure(err)
if not onBatch.isNil and
batchErr =? (await onBatch(blocks.mapIt(it.read.get))).errorOption:
let blocks = blockResults.filterIt(it.isSuccess()).mapIt(it.value)
let numOfFailedBlocks = blockResults.len - blocks.len
if numOfFailedBlocks > 0:
return
failure("Some blocks failed (Result) to fetch (" & $numOfFailedBlocks & ")")
if not onBatch.isNil and batchErr =? (await onBatch(blocks)).errorOption:
return failure(batchErr)
await sleepAsync(1.millis)
if not iter.finished:
await sleepAsync(1.millis)
success()

View File

@ -33,10 +33,10 @@ proc new*(
return 0
var read = 0
while read < len:
while read < len and (pad or read < size - consumed):
rng.shuffle(alpha)
for a in alpha:
if read >= len:
if read >= len or (not pad and read >= size - consumed):
break
data[read] = a

View File

@ -30,6 +30,7 @@ import pkg/codex/discovery
import pkg/codex/erasure
import pkg/codex/merkletree
import pkg/codex/blocktype as bt
import pkg/codex/rng
import pkg/codex/node {.all.}
@ -78,6 +79,31 @@ asyncchecksuite "Test Node - Basic":
)
).tryGet()
test "Block Batching with corrupted blocks":
let blocks = await makeRandomBlocks(datasetSize = 64.KiBs.int, blockSize = 64.KiBs)
assert blocks.len == 1
let blk = blocks[0]
# corrupt block
let pos = rng.Rng.instance.rand(blk.data.len - 1)
blk.data[pos] = byte 0
let manifest = await storeDataGetManifest(localStore, blocks)
let batchSize = manifest.blocksCount
let res = (
await node.fetchBatched(
manifest,
batchSize = batchSize,
proc(blocks: seq[bt.Block]): Future[?!void] {.gcsafe, async.} =
return failure("Should not be called"),
)
)
check res.isFailure
check res.error of CatchableError
check res.error.msg == "Some blocks failed (Result) to fetch (1)"
test "Should store Data Stream":
let
stream = BufferStream.new()