mirror of
https://github.com/logos-storage/logos-storage-nim.git
synced 2026-01-05 15:03:07 +00:00
make sure we do not call "get" on unverified Result while fetching in batches (#1169)
* makes sure we do not call "get" on unverified result * make handling of failed blocks in fetchBatched even more explicit * simplifies allFinishedValues and makes it independent from allFinishedFailed * only sleep if not iter.finished in fetchBatched
This commit is contained in:
parent
0ec52abc98
commit
5ec3b2b027
@ -8,6 +8,8 @@
|
||||
## those terms.
|
||||
|
||||
import std/options
|
||||
import std/sugar
|
||||
import std/sequtils
|
||||
|
||||
import pkg/results
|
||||
import pkg/chronos
|
||||
@ -42,7 +44,9 @@ func toFailure*[T](exp: Option[T]): Result[T, ref CatchableError] {.inline.} =
|
||||
else:
|
||||
T.failure("Option is None")
|
||||
|
||||
proc allFinishedFailed*[T](futs: seq[Future[T]]): Future[FinishedFailed[T]] {.async.} =
|
||||
proc allFinishedFailed*[T](
|
||||
futs: seq[Future[T]]
|
||||
): Future[FinishedFailed[T]] {.async: (raises: [CancelledError]).} =
|
||||
## Check if all futures have finished or failed
|
||||
##
|
||||
## TODO: wip, not sure if we want this - at the minimum,
|
||||
@ -57,3 +61,26 @@ proc allFinishedFailed*[T](futs: seq[Future[T]]): Future[FinishedFailed[T]] {.as
|
||||
res.success.add f
|
||||
|
||||
return res
|
||||
|
||||
proc allFinishedValues*[T](
|
||||
futs: seq[Future[T]]
|
||||
): Future[?!seq[T]] {.async: (raises: [CancelledError]).} =
|
||||
## If all futures have finished, return corresponding values,
|
||||
## otherwise return failure
|
||||
##
|
||||
|
||||
# wait for all futures to be either completed, failed or canceled
|
||||
await allFutures(futs)
|
||||
|
||||
let numOfFailed = futs.countIt(it.failed)
|
||||
|
||||
if numOfFailed > 0:
|
||||
return failure "Some futures failed (" & $numOfFailed & "))"
|
||||
|
||||
# here, we know there are no failed futures in "futs"
|
||||
# and we are only interested in those that completed successfully
|
||||
let values = collect:
|
||||
for b in futs:
|
||||
if b.finished:
|
||||
b.value
|
||||
return success values
|
||||
|
||||
@ -183,23 +183,29 @@ proc fetchBatched*(
|
||||
# )
|
||||
|
||||
while not iter.finished:
|
||||
let blocks = collect:
|
||||
let blockFutures = collect:
|
||||
for i in 0 ..< batchSize:
|
||||
if not iter.finished:
|
||||
let address = BlockAddress.init(cid, iter.next())
|
||||
if not (await address in self.networkStore) or fetchLocal:
|
||||
self.networkStore.getBlock(address)
|
||||
|
||||
let res = await allFinishedFailed(blocks)
|
||||
if res.failure.len > 0:
|
||||
trace "Some blocks failed to fetch", len = res.failure.len
|
||||
return failure("Some blocks failed to fetch (" & $res.failure.len & " )")
|
||||
without blockResults =? await allFinishedValues(blockFutures), err:
|
||||
trace "Some blocks failed to fetch", err = err.msg
|
||||
return failure(err)
|
||||
|
||||
if not onBatch.isNil and
|
||||
batchErr =? (await onBatch(blocks.mapIt(it.read.get))).errorOption:
|
||||
let blocks = blockResults.filterIt(it.isSuccess()).mapIt(it.value)
|
||||
|
||||
let numOfFailedBlocks = blockResults.len - blocks.len
|
||||
if numOfFailedBlocks > 0:
|
||||
return
|
||||
failure("Some blocks failed (Result) to fetch (" & $numOfFailedBlocks & ")")
|
||||
|
||||
if not onBatch.isNil and batchErr =? (await onBatch(blocks)).errorOption:
|
||||
return failure(batchErr)
|
||||
|
||||
await sleepAsync(1.millis)
|
||||
if not iter.finished:
|
||||
await sleepAsync(1.millis)
|
||||
|
||||
success()
|
||||
|
||||
|
||||
@ -30,6 +30,7 @@ import pkg/codex/discovery
|
||||
import pkg/codex/erasure
|
||||
import pkg/codex/merkletree
|
||||
import pkg/codex/blocktype as bt
|
||||
import pkg/codex/rng
|
||||
|
||||
import pkg/codex/node {.all.}
|
||||
|
||||
@ -78,6 +79,31 @@ asyncchecksuite "Test Node - Basic":
|
||||
)
|
||||
).tryGet()
|
||||
|
||||
test "Block Batching with corrupted blocks":
|
||||
let blocks = await makeRandomBlocks(datasetSize = 64.KiBs.int, blockSize = 64.KiBs)
|
||||
assert blocks.len == 1
|
||||
|
||||
let blk = blocks[0]
|
||||
|
||||
# corrupt block
|
||||
let pos = rng.Rng.instance.rand(blk.data.len - 1)
|
||||
blk.data[pos] = byte 0
|
||||
|
||||
let manifest = await storeDataGetManifest(localStore, blocks)
|
||||
|
||||
let batchSize = manifest.blocksCount
|
||||
let res = (
|
||||
await node.fetchBatched(
|
||||
manifest,
|
||||
batchSize = batchSize,
|
||||
proc(blocks: seq[bt.Block]): Future[?!void] {.gcsafe, async.} =
|
||||
return failure("Should not be called"),
|
||||
)
|
||||
)
|
||||
check res.isFailure
|
||||
check res.error of CatchableError
|
||||
check res.error.msg == "Some blocks failed (Result) to fetch (1)"
|
||||
|
||||
test "Should store Data Stream":
|
||||
let
|
||||
stream = BufferStream.new()
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user