checked exceptions in stores (#1179)

* checked exceptions in stores

* makes asynciter as much exception safe as it gets

* introduce "SafeAsyncIter" that uses Results and limits exceptions to cancellations

* adds {.push raises: [].} to errors

* uses SafeAsyncIter in "listBlocks" and in "getBlockExpirations"

* simplifies safeasynciter (magic of auto)

* gets rid of ugly casts

* tiny fix in hte way we create raising futures in tests of safeasynciter

* Removes two more casts caused by using checked exceptions

* adds an extended explanation of one more complex SafeAsyncIter test

* adds missing "finishOnErr" param in slice constructor of SafeAsyncIter

* better fix for "Error: Exception can raise an unlisted exception: Exception" error.

---------

Co-authored-by: Dmitriy Ryajov <dryajov@gmail.com>
This commit is contained in:
Marcin Czenko 2025-05-21 23:17:04 +02:00 committed by GitHub
parent bde98738c2
commit 748830570a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
38 changed files with 1057 additions and 270 deletions

View File

@ -7,6 +7,8 @@
## This file may not be copied, modified, or distributed except according to
## those terms.
{.push raises: [].}
import pkg/chronos
import pkg/libp2p/cid
import pkg/libp2p/multicodec
@ -81,16 +83,12 @@ proc advertiseBlock(b: Advertiser, cid: Cid) {.async: (raises: [CancelledError])
proc advertiseLocalStoreLoop(b: Advertiser) {.async: (raises: []).} =
try:
while b.advertiserRunning:
try:
if cids =? await b.localStore.listBlocks(blockType = BlockType.Manifest):
trace "Advertiser begins iterating blocks..."
for c in cids:
if cid =? await c:
await b.advertiseBlock(cid)
trace "Advertiser iterating blocks finished."
except CatchableError as e:
error "Error in advertise local store loop", error = e.msgDetail
raiseAssert("Unexpected exception in advertiseLocalStoreLoop")
if cidsIter =? await b.localStore.listBlocks(blockType = BlockType.Manifest):
trace "Advertiser begins iterating blocks..."
for c in cidsIter:
if cid =? await c:
await b.advertiseBlock(cid)
trace "Advertiser iterating blocks finished."
await sleepAsync(b.advertiseLocalStoreLoopSleep)
except CancelledError:
@ -126,8 +124,11 @@ proc start*(b: Advertiser) {.async: (raises: []).} =
trace "Advertiser start"
proc onBlock(cid: Cid) {.async.} =
await b.advertiseBlock(cid)
proc onBlock(cid: Cid) {.async: (raises: []).} =
try:
await b.advertiseBlock(cid)
except CancelledError:
trace "Cancelled advertise block", cid
doAssert(b.localStore.onBlockStored.isNone())
b.localStore.onBlockStored = onBlock.some

View File

@ -202,8 +202,8 @@ proc downloadInternal(
trace "Block download cancelled"
if not handle.finished:
await handle.cancelAndWait()
except CatchableError as exc:
warn "Error downloadloading block", exc = exc.msg
except RetriesExhaustedError as exc:
warn "Retries exhausted for block", address, exc = exc.msg
if not handle.finished:
handle.fail(exc)
finally:
@ -309,7 +309,7 @@ proc cancelBlocks(
return peerCtx
try:
let (succeededFuts, failedFuts) = await allFinishedFailed(
let (succeededFuts, failedFuts) = await allFinishedFailed[BlockExcPeerCtx](
toSeq(self.peers.peers.values).filterIt(it.peerHave.anyIt(it in addrs)).map(
processPeer
)

View File

@ -177,7 +177,7 @@ proc start*(s: CodexServer) {.async.} =
proc stop*(s: CodexServer) {.async.} =
notice "Stopping codex node"
let res = await noCancel allFinishedFailed(
let res = await noCancel allFinishedFailed[void](
@[
s.restServer.stop(),
s.codexNode.switch.stop(),

View File

@ -7,6 +7,8 @@
## This file may not be copied, modified, or distributed except according to
## those terms.
{.push raises: [].}
import std/options
import std/sugar
import std/sequtils
@ -45,7 +47,7 @@ func toFailure*[T](exp: Option[T]): Result[T, ref CatchableError] {.inline.} =
T.failure("Option is None")
proc allFinishedFailed*[T](
futs: seq[Future[T]]
futs: auto
): Future[FinishedFailed[T]] {.async: (raises: [CancelledError]).} =
## Check if all futures have finished or failed
##
@ -63,7 +65,7 @@ proc allFinishedFailed*[T](
return res
proc allFinishedValues*[T](
futs: seq[Future[T]]
futs: auto
): Future[?!seq[T]] {.async: (raises: [CancelledError]).} =
## If all futures have finished, return corresponding values,
## otherwise return failure

View File

@ -78,7 +78,9 @@ type
CodexNodeRef* = ref CodexNode
OnManifest* = proc(cid: Cid, manifest: Manifest): void {.gcsafe, raises: [].}
BatchProc* = proc(blocks: seq[bt.Block]): Future[?!void] {.gcsafe, raises: [].}
BatchProc* = proc(blocks: seq[bt.Block]): Future[?!void] {.
gcsafe, async: (raises: [CancelledError])
.}
func switch*(self: CodexNodeRef): Switch =
return self.switch
@ -109,7 +111,9 @@ proc storeManifest*(
success blk
proc fetchManifest*(self: CodexNodeRef, cid: Cid): Future[?!Manifest] {.async.} =
proc fetchManifest*(
self: CodexNodeRef, cid: Cid
): Future[?!Manifest] {.async: (raises: [CancelledError]).} =
## Fetch and decode a manifest block
##
@ -144,7 +148,7 @@ proc connect*(
proc updateExpiry*(
self: CodexNodeRef, manifestCid: Cid, expiry: SecondsSince1970
): Future[?!void] {.async.} =
): Future[?!void] {.async: (raises: [CancelledError]).} =
without manifest =? await self.fetchManifest(manifestCid), error:
trace "Unable to fetch manifest for cid", manifestCid
return failure(error)
@ -154,7 +158,7 @@ proc updateExpiry*(
self.networkStore.localStore.ensureExpiry(manifest.treeCid, it, expiry)
)
let res = await allFinishedFailed(ensuringFutures)
let res = await allFinishedFailed[?!void](ensuringFutures)
if res.failure.len > 0:
trace "Some blocks failed to update expiry", len = res.failure.len
return failure("Some blocks failed to update expiry (" & $res.failure.len & " )")
@ -172,7 +176,7 @@ proc fetchBatched*(
batchSize = DefaultFetchBatch,
onBatch: BatchProc = nil,
fetchLocal = true,
): Future[?!void] {.async, gcsafe.} =
): Future[?!void] {.async: (raises: [CancelledError]), gcsafe.} =
## Fetch blocks in batches of `batchSize`
##
@ -190,7 +194,10 @@ proc fetchBatched*(
if not (await address in self.networkStore) or fetchLocal:
self.networkStore.getBlock(address)
without blockResults =? await allFinishedValues(blockFutures), err:
if blockFutures.len == 0:
continue
without blockResults =? await allFinishedValues[?!bt.Block](blockFutures), err:
trace "Some blocks failed to fetch", err = err.msg
return failure(err)
@ -215,7 +222,7 @@ proc fetchBatched*(
batchSize = DefaultFetchBatch,
onBatch: BatchProc = nil,
fetchLocal = true,
): Future[?!void] =
): Future[?!void] {.async: (raw: true, raises: [CancelledError]).} =
## Fetch manifest in batches of `batchSize`
##
@ -240,8 +247,6 @@ proc fetchDatasetAsync*(
error "Unable to fetch blocks", err = err.msg
except CancelledError as exc:
trace "Cancelled fetching blocks", exc = exc.msg
except CatchableError as exc:
error "Error fetching blocks", exc = exc.msg
proc fetchDatasetAsyncTask*(self: CodexNodeRef, manifest: Manifest) =
## Start fetching a dataset in the background.
@ -249,7 +254,9 @@ proc fetchDatasetAsyncTask*(self: CodexNodeRef, manifest: Manifest) =
##
self.trackedFutures.track(self.fetchDatasetAsync(manifest, fetchLocal = false))
proc streamSingleBlock(self: CodexNodeRef, cid: Cid): Future[?!LPStream] {.async.} =
proc streamSingleBlock(
self: CodexNodeRef, cid: Cid
): Future[?!LPStream] {.async: (raises: [CancelledError]).} =
## Streams the contents of a single block.
##
trace "Streaming single block", cid = cid
@ -264,7 +271,9 @@ proc streamSingleBlock(self: CodexNodeRef, cid: Cid): Future[?!LPStream] {.async
defer:
await stream.pushEof()
await stream.pushData(blk.data)
except CatchableError as exc:
except CancelledError as exc:
trace "Streaming block cancelled", cid, exc = exc.msg
except LPStreamError as exc:
trace "Unable to send block", cid, exc = exc.msg
self.trackedFutures.track(streamOneBlock())
@ -272,7 +281,7 @@ proc streamSingleBlock(self: CodexNodeRef, cid: Cid): Future[?!LPStream] {.async
proc streamEntireDataset(
self: CodexNodeRef, manifest: Manifest, manifestCid: Cid
): Future[?!LPStream] {.async.} =
): Future[?!LPStream] {.async: (raises: [CancelledError]).} =
## Streams the contents of the entire dataset described by the manifest.
##
trace "Retrieving blocks from manifest", manifestCid
@ -294,14 +303,14 @@ proc streamEntireDataset(
jobs.add(erasureJob())
jobs.add(self.fetchDatasetAsync(manifest))
jobs.add(self.fetchDatasetAsync(manifest, fetchLocal = false))
# Monitor stream completion and cancel background jobs when done
proc monitorStream() {.async: (raises: []).} =
try:
await stream.join()
except CatchableError as exc:
warn "Stream failed", exc = exc.msg
except CancelledError as exc:
warn "Stream cancelled", exc = exc.msg
finally:
await noCancel allFutures(jobs.mapIt(it.cancelAndWait))
@ -314,7 +323,7 @@ proc streamEntireDataset(
proc retrieve*(
self: CodexNodeRef, cid: Cid, local: bool = true
): Future[?!LPStream] {.async.} =
): Future[?!LPStream] {.async: (raises: [CancelledError]).} =
## Retrieve by Cid a single block or an entire dataset described by manifest
##
@ -470,11 +479,11 @@ proc store*(
return manifestBlk.cid.success
proc iterateManifests*(self: CodexNodeRef, onManifest: OnManifest) {.async.} =
without cids =? await self.networkStore.listBlocks(BlockType.Manifest):
without cidsIter =? await self.networkStore.listBlocks(BlockType.Manifest):
warn "Failed to listBlocks"
return
for c in cids:
for c in cidsIter:
if cid =? await c:
without blk =? await self.networkStore.getBlock(cid):
warn "Failed to get manifest block by cid", cid
@ -617,7 +626,7 @@ proc onStore(
slotIdx: uint64,
blocksCb: BlocksCb,
isRepairing: bool = false,
): Future[?!void] {.async.} =
): Future[?!void] {.async: (raises: [CancelledError]).} =
## store data in local storage
##
@ -648,13 +657,15 @@ proc onStore(
trace "Slot index not in manifest", slotIdx
return failure(newException(CodexError, "Slot index not in manifest"))
proc updateExpiry(blocks: seq[bt.Block]): Future[?!void] {.async.} =
proc updateExpiry(
blocks: seq[bt.Block]
): Future[?!void] {.async: (raises: [CancelledError]).} =
trace "Updating expiry for blocks", blocks = blocks.len
let ensureExpiryFutures =
blocks.mapIt(self.networkStore.ensureExpiry(it.cid, expiry.toSecondsSince1970))
let res = await allFinishedFailed(ensureExpiryFutures)
let res = await allFinishedFailed[?!void](ensureExpiryFutures)
if res.failure.len > 0:
trace "Some blocks failed to update expiry", len = res.failure.len
return failure("Some blocks failed to update expiry (" & $res.failure.len & " )")
@ -702,7 +713,7 @@ proc onStore(
proc onProve(
self: CodexNodeRef, slot: Slot, challenge: ProofChallenge
): Future[?!Groth16Proof] {.async.} =
): Future[?!Groth16Proof] {.async: (raises: [CancelledError]).} =
## Generats a proof for a given slot and challenge
##
@ -758,7 +769,7 @@ proc onProve(
proc onExpiryUpdate(
self: CodexNodeRef, rootCid: Cid, expiry: SecondsSince1970
): Future[?!void] {.async.} =
): Future[?!void] {.async: (raises: [CancelledError]).} =
return await self.updateExpiry(rootCid, expiry)
proc onClear(self: CodexNodeRef, request: StorageRequest, slotIndex: uint64) =
@ -781,12 +792,12 @@ proc start*(self: CodexNodeRef) {.async.} =
slot: uint64,
onBatch: BatchProc,
isRepairing: bool = false,
): Future[?!void] =
): Future[?!void] {.async: (raw: true, raises: [CancelledError]).} =
self.onStore(request, slot, onBatch, isRepairing)
hostContracts.sales.onExpiryUpdate = proc(
rootCid: Cid, expiry: SecondsSince1970
): Future[?!void] =
): Future[?!void] {.async: (raw: true, raises: [CancelledError]).} =
self.onExpiryUpdate(rootCid, expiry)
hostContracts.sales.onClear = proc(request: StorageRequest, slotIndex: uint64) =
@ -795,7 +806,7 @@ proc start*(self: CodexNodeRef) {.async.} =
hostContracts.sales.onProve = proc(
slot: Slot, challenge: ProofChallenge
): Future[?!Groth16Proof] =
): Future[?!Groth16Proof] {.async: (raw: true, raises: [CancelledError]).} =
# TODO: generate proof
self.onProve(slot, challenge)

View File

@ -78,7 +78,7 @@ proc retrieveCid(
## manner
##
var stream: LPStream
var lpStream: LPStream
var bytes = 0
try:
@ -94,6 +94,8 @@ proc retrieveCid(
await resp.sendBody(error.msg)
return
lpStream = stream
# It is ok to fetch again the manifest because it will hit the cache
without manifest =? (await node.fetchManifest(cid)), err:
error "Failed to fetch manifest", err = err.msg
@ -139,15 +141,15 @@ proc retrieveCid(
codex_api_downloads.inc()
except CancelledError as exc:
raise exc
except CatchableError as exc:
except LPStreamError as exc:
warn "Error streaming blocks", exc = exc.msg
resp.status = Http500
if resp.isPending():
await resp.sendBody(exc.msg)
finally:
info "Sent bytes", cid = cid, bytes
if not stream.isNil:
await stream.close()
if not lpStream.isNil:
await lpStream.close()
proc buildCorsHeaders(
httpMethod: string, allowedOrigin: Option[string]

View File

@ -24,15 +24,17 @@ type
slotQueue*: SlotQueue
simulateProofFailures*: int
BlocksCb* = proc(blocks: seq[bt.Block]): Future[?!void] {.gcsafe, raises: [].}
BlocksCb* = proc(blocks: seq[bt.Block]): Future[?!void] {.
gcsafe, async: (raises: [CancelledError])
.}
OnStore* = proc(
request: StorageRequest, slot: uint64, blocksCb: BlocksCb, isRepairing: bool
): Future[?!void] {.gcsafe, upraises: [].}
): Future[?!void] {.gcsafe, async: (raises: [CancelledError]).}
OnProve* = proc(slot: Slot, challenge: ProofChallenge): Future[?!Groth16Proof] {.
gcsafe, upraises: []
gcsafe, async: (raises: [CancelledError])
.}
OnExpiryUpdate* = proc(rootCid: Cid, expiry: SecondsSince1970): Future[?!void] {.
gcsafe, upraises: []
gcsafe, async: (raises: [CancelledError])
.}
OnClear* = proc(request: StorageRequest, slotIndex: uint64) {.gcsafe, upraises: [].}
OnSale* = proc(request: StorageRequest, slotIndex: uint64) {.gcsafe, upraises: [].}
OnClear* = proc(request: StorageRequest, slotIndex: uint64) {.gcsafe, raises: [].}
OnSale* = proc(request: StorageRequest, slotIndex: uint64) {.gcsafe, raises: [].}

View File

@ -55,7 +55,9 @@ method run*(
reservationId = reservation.id
availabilityId = reservation.availabilityId
proc onBlocks(blocks: seq[bt.Block]): Future[?!void] {.async.} =
proc onBlocks(
blocks: seq[bt.Block]
): Future[?!void] {.async: (raises: [CancelledError]).} =
# release batches of blocks as they are written to disk and
# update availability size
var bytes: uint = 0

View File

@ -134,7 +134,7 @@ func manifest*[T, H](self: SlotsBuilder[T, H]): Manifest =
proc buildBlockTree*[T, H](
self: SlotsBuilder[T, H], blkIdx: Natural, slotPos: Natural
): Future[?!(seq[byte], T)] {.async.} =
): Future[?!(seq[byte], T)] {.async: (raises: [CancelledError]).} =
## Build the block digest tree and return a tuple with the
## block data and the tree.
##
@ -167,7 +167,7 @@ proc buildBlockTree*[T, H](
proc getCellHashes*[T, H](
self: SlotsBuilder[T, H], slotIndex: Natural
): Future[?!seq[H]] {.async.} =
): Future[?!seq[H]] {.async: (raises: [CancelledError, IndexingError]).} =
## Collect all the cells from a block and return
## their hashes.
##
@ -202,19 +202,23 @@ proc getCellHashes*[T, H](
proc buildSlotTree*[T, H](
self: SlotsBuilder[T, H], slotIndex: Natural
): Future[?!T] {.async.} =
): Future[?!T] {.async: (raises: [CancelledError]).} =
## Build the slot tree from the block digest hashes
## and return the tree.
without cellHashes =? (await self.getCellHashes(slotIndex)), err:
error "Failed to select slot blocks", err = err.msg
return failure(err)
try:
without cellHashes =? (await self.getCellHashes(slotIndex)), err:
error "Failed to select slot blocks", err = err.msg
return failure(err)
T.init(cellHashes)
T.init(cellHashes)
except IndexingError as err:
error "Failed to build slot tree", err = err.msg
return failure(err)
proc buildSlot*[T, H](
self: SlotsBuilder[T, H], slotIndex: Natural
): Future[?!H] {.async.} =
): Future[?!H] {.async: (raises: [CancelledError]).} =
## Build a slot tree and store the proofs in
## the block store.
##
@ -250,7 +254,9 @@ proc buildSlot*[T, H](
func buildVerifyTree*[T, H](self: SlotsBuilder[T, H], slotRoots: openArray[H]): ?!T =
T.init(@slotRoots)
proc buildSlots*[T, H](self: SlotsBuilder[T, H]): Future[?!void] {.async.} =
proc buildSlots*[T, H](
self: SlotsBuilder[T, H]
): Future[?!void] {.async: (raises: [CancelledError]).} =
## Build all slot trees and store them in the block store.
##
@ -280,7 +286,9 @@ proc buildSlots*[T, H](self: SlotsBuilder[T, H]): Future[?!void] {.async.} =
success()
proc buildManifest*[T, H](self: SlotsBuilder[T, H]): Future[?!Manifest] {.async.} =
proc buildManifest*[T, H](
self: SlotsBuilder[T, H]
): Future[?!Manifest] {.async: (raises: [CancelledError]).} =
if err =? (await self.buildSlots()).errorOption:
error "Failed to build slot roots", err = err.msg
return failure(err)

View File

@ -50,7 +50,7 @@ type
proc prove*(
self: Prover, slotIdx: int, manifest: Manifest, challenge: ProofChallenge
): Future[?!(AnyProofInputs, AnyProof)] {.async.} =
): Future[?!(AnyProofInputs, AnyProof)] {.async: (raises: [CancelledError]).} =
## Prove a statement using backend.
## Returns a future that resolves to a proof.

View File

@ -48,7 +48,7 @@ func getCell*[T, H](
proc getSample*[T, H](
self: DataSampler[T, H], cellIdx: int, slotTreeCid: Cid, slotRoot: H
): Future[?!Sample[H]] {.async.} =
): Future[?!Sample[H]] {.async: (raises: [CancelledError]).} =
let
cellsPerBlock = self.builder.numBlockCells
blkCellIdx = cellIdx.toCellInBlk(cellsPerBlock) # block cell index
@ -81,7 +81,7 @@ proc getSample*[T, H](
proc getProofInput*[T, H](
self: DataSampler[T, H], entropy: ProofChallenge, nSamples: Natural
): Future[?!ProofInputs[H]] {.async.} =
): Future[?!ProofInputs[H]] {.async: (raises: [CancelledError]).} =
## Generate proofs as input to the proving circuit.
##

View File

@ -7,10 +7,7 @@
## This file may not be copied, modified, or distributed except according to
## those terms.
import pkg/upraises
push:
{.upraises: [].}
{.push raises: [].}
import pkg/chronos
import pkg/libp2p
@ -32,11 +29,13 @@ type
Block
Both
CidCallback* = proc(cid: Cid): Future[void] {.gcsafe, raises: [].}
CidCallback* = proc(cid: Cid): Future[void] {.gcsafe, async: (raises: []).}
BlockStore* = ref object of RootObj
onBlockStored*: ?CidCallback
method getBlock*(self: BlockStore, cid: Cid): Future[?!Block] {.base, gcsafe.} =
method getBlock*(
self: BlockStore, cid: Cid
): Future[?!Block] {.base, async: (raises: [CancelledError]), gcsafe.} =
## Get a block from the blockstore
##
@ -44,20 +43,23 @@ method getBlock*(self: BlockStore, cid: Cid): Future[?!Block] {.base, gcsafe.} =
method getBlock*(
self: BlockStore, treeCid: Cid, index: Natural
): Future[?!Block] {.base, gcsafe.} =
): Future[?!Block] {.base, async: (raises: [CancelledError]), gcsafe.} =
## Get a block from the blockstore
##
raiseAssert("getBlock by treecid not implemented!")
method getCid*(self: BlockStore, treeCid: Cid, index: Natural): Future[?!Cid] {.base.} =
method getCid*(
self: BlockStore, treeCid: Cid, index: Natural
): Future[?!Cid] {.base, async: (raises: [CancelledError]), gcsafe.} =
## Get a cid given a tree and index
##
raiseAssert("getCid by treecid not implemented!")
method getBlock*(
self: BlockStore, address: BlockAddress
): Future[?!Block] {.base, gcsafe.} =
): Future[?!Block] {.base, async: (raises: [CancelledError]), gcsafe.} =
## Get a block from the blockstore
##
@ -65,7 +67,7 @@ method getBlock*(
method getBlockAndProof*(
self: BlockStore, treeCid: Cid, index: Natural
): Future[?!(Block, CodexProof)] {.base, gcsafe.} =
): Future[?!(Block, CodexProof)] {.base, async: (raises: [CancelledError]), gcsafe.} =
## Get a block and associated inclusion proof by Cid of a merkle tree and an index of a leaf in a tree
##
@ -73,7 +75,7 @@ method getBlockAndProof*(
method putBlock*(
self: BlockStore, blk: Block, ttl = Duration.none
): Future[?!void] {.base, gcsafe.} =
): Future[?!void] {.base, async: (raises: [CancelledError]), gcsafe.} =
## Put a block to the blockstore
##
@ -81,7 +83,7 @@ method putBlock*(
method putCidAndProof*(
self: BlockStore, treeCid: Cid, index: Natural, blockCid: Cid, proof: CodexProof
): Future[?!void] {.base, gcsafe.} =
): Future[?!void] {.base, async: (raises: [CancelledError]), gcsafe.} =
## Put a block proof to the blockstore
##
@ -89,7 +91,7 @@ method putCidAndProof*(
method getCidAndProof*(
self: BlockStore, treeCid: Cid, index: Natural
): Future[?!(Cid, CodexProof)] {.base, gcsafe.} =
): Future[?!(Cid, CodexProof)] {.base, async: (raises: [CancelledError]), gcsafe.} =
## Get a block proof from the blockstore
##
@ -97,7 +99,7 @@ method getCidAndProof*(
method ensureExpiry*(
self: BlockStore, cid: Cid, expiry: SecondsSince1970
): Future[?!void] {.base, gcsafe.} =
): Future[?!void] {.base, async: (raises: [CancelledError]), gcsafe.} =
## Ensure that block's assosicated expiry is at least given timestamp
## If the current expiry is lower then it is updated to the given one, otherwise it is left intact
##
@ -106,14 +108,16 @@ method ensureExpiry*(
method ensureExpiry*(
self: BlockStore, treeCid: Cid, index: Natural, expiry: SecondsSince1970
): Future[?!void] {.base, gcsafe.} =
): Future[?!void] {.base, async: (raises: [CancelledError]), gcsafe.} =
## Ensure that block's associated expiry is at least given timestamp
## If the current expiry is lower then it is updated to the given one, otherwise it is left intact
##
raiseAssert("Not implemented!")
method delBlock*(self: BlockStore, cid: Cid): Future[?!void] {.base, gcsafe.} =
method delBlock*(
self: BlockStore, cid: Cid
): Future[?!void] {.base, async: (raises: [CancelledError]), gcsafe.} =
## Delete a block from the blockstore
##
@ -121,13 +125,15 @@ method delBlock*(self: BlockStore, cid: Cid): Future[?!void] {.base, gcsafe.} =
method delBlock*(
self: BlockStore, treeCid: Cid, index: Natural
): Future[?!void] {.base, gcsafe.} =
): Future[?!void] {.base, async: (raises: [CancelledError]), gcsafe.} =
## Delete a block from the blockstore
##
raiseAssert("delBlock not implemented!")
method hasBlock*(self: BlockStore, cid: Cid): Future[?!bool] {.base, gcsafe.} =
method hasBlock*(
self: BlockStore, cid: Cid
): Future[?!bool] {.base, async: (raises: [CancelledError]), gcsafe.} =
## Check if the block exists in the blockstore
##
@ -135,7 +141,7 @@ method hasBlock*(self: BlockStore, cid: Cid): Future[?!bool] {.base, gcsafe.} =
method hasBlock*(
self: BlockStore, tree: Cid, index: Natural
): Future[?!bool] {.base, gcsafe.} =
): Future[?!bool] {.base, async: (raises: [CancelledError]), gcsafe.} =
## Check if the block exists in the blockstore
##
@ -143,27 +149,31 @@ method hasBlock*(
method listBlocks*(
self: BlockStore, blockType = BlockType.Manifest
): Future[?!AsyncIter[?Cid]] {.base, gcsafe.} =
): Future[?!SafeAsyncIter[Cid]] {.base, async: (raises: [CancelledError]), gcsafe.} =
## Get the list of blocks in the BlockStore. This is an intensive operation
##
raiseAssert("listBlocks not implemented!")
method close*(self: BlockStore): Future[void] {.base, gcsafe.} =
method close*(self: BlockStore): Future[void] {.base, async: (raises: []), gcsafe.} =
## Close the blockstore, cleaning up resources managed by it.
## For some implementations this may be a no-op
##
raiseAssert("close not implemented!")
proc contains*(self: BlockStore, blk: Cid): Future[bool] {.async.} =
proc contains*(
self: BlockStore, blk: Cid
): Future[bool] {.async: (raises: [CancelledError]), gcsafe.} =
## Check if the block exists in the blockstore.
## Return false if error encountered
##
return (await self.hasBlock(blk)) |? false
proc contains*(self: BlockStore, address: BlockAddress): Future[bool] {.async.} =
proc contains*(
self: BlockStore, address: BlockAddress
): Future[bool] {.async: (raises: [CancelledError]), gcsafe.} =
return
if address.leaf:
(await self.hasBlock(address.treeCid, address.index)) |? false

View File

@ -7,10 +7,7 @@
## This file may not be copied, modified, or distributed except according to
## those terms.
import pkg/upraises
push:
{.upraises: [].}
{.push raises: [].}
import std/options
@ -46,7 +43,9 @@ type
const DefaultCacheSize*: NBytes = 5.MiBs
method getBlock*(self: CacheStore, cid: Cid): Future[?!Block] {.async.} =
method getBlock*(
self: CacheStore, cid: Cid
): Future[?!Block] {.async: (raises: [CancelledError]).} =
## Get a block from the stores
##
@ -69,7 +68,7 @@ method getBlock*(self: CacheStore, cid: Cid): Future[?!Block] {.async.} =
method getCidAndProof*(
self: CacheStore, treeCid: Cid, index: Natural
): Future[?!(Cid, CodexProof)] {.async.} =
): Future[?!(Cid, CodexProof)] {.async: (raises: [CancelledError]).} =
if cidAndProof =? self.cidAndProofCache.getOption((treeCid, index)):
success(cidAndProof)
else:
@ -81,7 +80,7 @@ method getCidAndProof*(
method getBlock*(
self: CacheStore, treeCid: Cid, index: Natural
): Future[?!Block] {.async.} =
): Future[?!Block] {.async: (raises: [CancelledError]).} =
without cidAndProof =? (await self.getCidAndProof(treeCid, index)), err:
return failure(err)
@ -89,7 +88,7 @@ method getBlock*(
method getBlockAndProof*(
self: CacheStore, treeCid: Cid, index: Natural
): Future[?!(Block, CodexProof)] {.async.} =
): Future[?!(Block, CodexProof)] {.async: (raises: [CancelledError]).} =
without cidAndProof =? (await self.getCidAndProof(treeCid, index)), err:
return failure(err)
@ -100,13 +99,17 @@ method getBlockAndProof*(
success((blk, proof))
method getBlock*(self: CacheStore, address: BlockAddress): Future[?!Block] =
method getBlock*(
self: CacheStore, address: BlockAddress
): Future[?!Block] {.async: (raw: true, raises: [CancelledError]).} =
if address.leaf:
self.getBlock(address.treeCid, address.index)
else:
self.getBlock(address.cid)
method hasBlock*(self: CacheStore, cid: Cid): Future[?!bool] {.async.} =
method hasBlock*(
self: CacheStore, cid: Cid
): Future[?!bool] {.async: (raises: [CancelledError]).} =
## Check if the block exists in the blockstore
##
@ -119,7 +122,7 @@ method hasBlock*(self: CacheStore, cid: Cid): Future[?!bool] {.async.} =
method hasBlock*(
self: CacheStore, treeCid: Cid, index: Natural
): Future[?!bool] {.async.} =
): Future[?!bool] {.async: (raises: [CancelledError]).} =
without cidAndProof =? (await self.getCidAndProof(treeCid, index)), err:
if err of BlockNotFoundError:
return success(false)
@ -128,7 +131,7 @@ method hasBlock*(
await self.hasBlock(cidAndProof[0])
func cids(self: CacheStore): (iterator (): Cid {.gcsafe.}) =
func cids(self: CacheStore): (iterator (): Cid {.gcsafe, raises: [].}) =
return
iterator (): Cid =
for cid in self.cache.keys:
@ -136,7 +139,7 @@ func cids(self: CacheStore): (iterator (): Cid {.gcsafe.}) =
method listBlocks*(
self: CacheStore, blockType = BlockType.Manifest
): Future[?!AsyncIter[?Cid]] {.async.} =
): Future[?!SafeAsyncIter[Cid]] {.async: (raises: [CancelledError]).} =
## Get the list of blocks in the BlockStore. This is an intensive operation
##
@ -145,12 +148,15 @@ method listBlocks*(
proc isFinished(): bool =
return finished(cids)
proc genNext(): Future[Cid] {.async.} =
cids()
proc genNext(): Future[?!Cid] {.async: (raises: [CancelledError]).} =
success(cids())
let iter = await (
AsyncIter[Cid].new(genNext, isFinished).filter(
proc(cid: Cid): Future[bool] {.async.} =
SafeAsyncIter[Cid].new(genNext, isFinished).filter(
proc(cid: ?!Cid): Future[bool] {.async: (raises: [CancelledError]).} =
without cid =? cid, err:
trace "Cannot get Cid from the iterator", err = err.msg
return false
without isManifest =? cid.isManifest, err:
trace "Error checking if cid is a manifest", err = err.msg
return false
@ -164,14 +170,7 @@ method listBlocks*(
return not isManifest
)
)
return success(
map[Cid, ?Cid](
iter,
proc(cid: Cid): Future[?Cid] {.async.} =
some(cid),
)
)
success(iter)
func putBlockSync(self: CacheStore, blk: Block): bool =
let blkSize = blk.data.len.NBytes # in bytes
@ -196,7 +195,7 @@ func putBlockSync(self: CacheStore, blk: Block): bool =
method putBlock*(
self: CacheStore, blk: Block, ttl = Duration.none
): Future[?!void] {.async.} =
): Future[?!void] {.async: (raises: [CancelledError]).} =
## Put a block to the blockstore
##
@ -213,13 +212,13 @@ method putBlock*(
method putCidAndProof*(
self: CacheStore, treeCid: Cid, index: Natural, blockCid: Cid, proof: CodexProof
): Future[?!void] {.async.} =
): Future[?!void] {.async: (raises: [CancelledError]).} =
self.cidAndProofCache[(treeCid, index)] = (blockCid, proof)
success()
method ensureExpiry*(
self: CacheStore, cid: Cid, expiry: SecondsSince1970
): Future[?!void] {.async.} =
): Future[?!void] {.async: (raises: [CancelledError]).} =
## Updates block's assosicated TTL in store - not applicable for CacheStore
##
@ -227,13 +226,15 @@ method ensureExpiry*(
method ensureExpiry*(
self: CacheStore, treeCid: Cid, index: Natural, expiry: SecondsSince1970
): Future[?!void] {.async.} =
): Future[?!void] {.async: (raises: [CancelledError]).} =
## Updates block's associated TTL in store - not applicable for CacheStore
##
discard # CacheStore does not have notion of TTL
method delBlock*(self: CacheStore, cid: Cid): Future[?!void] {.async.} =
method delBlock*(
self: CacheStore, cid: Cid
): Future[?!void] {.async: (raises: [CancelledError]).} =
## Delete a block from the blockstore
##
@ -250,7 +251,7 @@ method delBlock*(self: CacheStore, cid: Cid): Future[?!void] {.async.} =
method delBlock*(
self: CacheStore, treeCid: Cid, index: Natural
): Future[?!void] {.async.} =
): Future[?!void] {.async: (raises: [CancelledError]).} =
let maybeRemoved = self.cidAndProofCache.del((treeCid, index))
if removed =? maybeRemoved:
@ -258,7 +259,7 @@ method delBlock*(
return success()
method close*(self: CacheStore): Future[void] {.async.} =
method close*(self: CacheStore): Future[void] {.async: (raises: []).} =
## Close the blockstore, a no-op for this implementation
##

View File

@ -10,13 +10,15 @@
## Store maintenance module
## Looks for and removes expired blocks from blockstores.
{.push raises: [].}
import pkg/chronos
import pkg/questionable
import pkg/questionable/results
import ./repostore
import ../utils/timer
import ../utils/asynciter
import ../utils/safeasynciter
import ../clock
import ../logutils
import ../systemclock
@ -54,19 +56,23 @@ proc new*(
offset: 0,
)
proc deleteExpiredBlock(self: BlockMaintainer, cid: Cid): Future[void] {.async.} =
proc deleteExpiredBlock(
self: BlockMaintainer, cid: Cid
): Future[void] {.async: (raises: [CancelledError]).} =
if isErr (await self.repoStore.delBlock(cid)):
trace "Unable to delete block from repoStore"
proc processBlockExpiration(
self: BlockMaintainer, be: BlockExpiration
): Future[void] {.async.} =
): Future[void] {.async: (raises: [CancelledError]).} =
if be.expiry < self.clock.now:
await self.deleteExpiredBlock(be.cid)
else:
inc self.offset
proc runBlockCheck(self: BlockMaintainer): Future[void] {.async.} =
proc runBlockCheck(
self: BlockMaintainer
): Future[void] {.async: (raises: [CancelledError]).} =
let expirations = await self.repoStore.getBlockExpirations(
maxNumber = self.numberOfBlocksPerInterval, offset = self.offset
)
@ -77,7 +83,9 @@ proc runBlockCheck(self: BlockMaintainer): Future[void] {.async.} =
var numberReceived = 0
for beFut in iter:
let be = await beFut
without be =? (await beFut), err:
trace "Unable to obtain blockExpiration from iterator"
continue
inc numberReceived
await self.processBlockExpiration(be)
await sleepAsync(1.millis) # cooperative scheduling
@ -88,15 +96,14 @@ proc runBlockCheck(self: BlockMaintainer): Future[void] {.async.} =
self.offset = 0
proc start*(self: BlockMaintainer) =
proc onTimer(): Future[void] {.async.} =
proc onTimer(): Future[void] {.async: (raises: []).} =
try:
await self.runBlockCheck()
except CancelledError as error:
raise error
except CatchableError as exc:
error "Unexpected exception in BlockMaintainer.onTimer(): ", msg = exc.msg
except CancelledError as err:
trace "Running block check in block maintenance timer callback cancelled: ",
err = err.msg
self.timer.start(onTimer, self.interval)
proc stop*(self: BlockMaintainer): Future[void] {.async.} =
proc stop*(self: BlockMaintainer): Future[void] {.async: (raises: []).} =
await self.timer.stop()

View File

@ -19,7 +19,7 @@ import ../blockexchange
import ../logutils
import ../merkletree
import ../utils/asyncheapqueue
import ../utils/asynciter
import ../utils/safeasynciter
import ./blockstore
export blockstore, blockexchange, asyncheapqueue
@ -31,7 +31,9 @@ type NetworkStore* = ref object of BlockStore
engine*: BlockExcEngine # blockexc decision engine
localStore*: BlockStore # local block store
method getBlock*(self: NetworkStore, address: BlockAddress): Future[?!Block] {.async.} =
method getBlock*(
self: NetworkStore, address: BlockAddress
): Future[?!Block] {.async: (raises: [CancelledError]).} =
without blk =? (await self.localStore.getBlock(address)), err:
if not (err of BlockNotFoundError):
error "Error getting block from local store", address, err = err.msg
@ -45,13 +47,17 @@ method getBlock*(self: NetworkStore, address: BlockAddress): Future[?!Block] {.a
return success blk
method getBlock*(self: NetworkStore, cid: Cid): Future[?!Block] =
method getBlock*(
self: NetworkStore, cid: Cid
): Future[?!Block] {.async: (raw: true, raises: [CancelledError]).} =
## Get a block from the blockstore
##
self.getBlock(BlockAddress.init(cid))
method getBlock*(self: NetworkStore, treeCid: Cid, index: Natural): Future[?!Block] =
method getBlock*(
self: NetworkStore, treeCid: Cid, index: Natural
): Future[?!Block] {.async: (raw: true, raises: [CancelledError]).} =
## Get a block from the blockstore
##
@ -59,7 +65,7 @@ method getBlock*(self: NetworkStore, treeCid: Cid, index: Natural): Future[?!Blo
method putBlock*(
self: NetworkStore, blk: Block, ttl = Duration.none
): Future[?!void] {.async.} =
): Future[?!void] {.async: (raises: [CancelledError]).} =
## Store block locally and notify the network
##
let res = await self.localStore.putBlock(blk, ttl)
@ -71,12 +77,12 @@ method putBlock*(
method putCidAndProof*(
self: NetworkStore, treeCid: Cid, index: Natural, blockCid: Cid, proof: CodexProof
): Future[?!void] =
): Future[?!void] {.async: (raw: true, raises: [CancelledError]).} =
self.localStore.putCidAndProof(treeCid, index, blockCid, proof)
method getCidAndProof*(
self: NetworkStore, treeCid: Cid, index: Natural
): Future[?!(Cid, CodexProof)] =
): Future[?!(Cid, CodexProof)] {.async: (raw: true, raises: [CancelledError]).} =
## Get a block proof from the blockstore
##
@ -84,7 +90,7 @@ method getCidAndProof*(
method ensureExpiry*(
self: NetworkStore, cid: Cid, expiry: SecondsSince1970
): Future[?!void] {.async.} =
): Future[?!void] {.async: (raises: [CancelledError]).} =
## Ensure that block's assosicated expiry is at least given timestamp
## If the current expiry is lower then it is updated to the given one, otherwise it is left intact
##
@ -101,7 +107,7 @@ method ensureExpiry*(
method ensureExpiry*(
self: NetworkStore, treeCid: Cid, index: Natural, expiry: SecondsSince1970
): Future[?!void] {.async.} =
): Future[?!void] {.async: (raises: [CancelledError]).} =
## Ensure that block's associated expiry is at least given timestamp
## If the current expiry is lower then it is updated to the given one, otherwise it is left intact
##
@ -118,10 +124,12 @@ method ensureExpiry*(
method listBlocks*(
self: NetworkStore, blockType = BlockType.Manifest
): Future[?!AsyncIter[?Cid]] =
): Future[?!SafeAsyncIter[Cid]] {.async: (raw: true, raises: [CancelledError]).} =
self.localStore.listBlocks(blockType)
method delBlock*(self: NetworkStore, cid: Cid): Future[?!void] =
method delBlock*(
self: NetworkStore, cid: Cid
): Future[?!void] {.async: (raw: true, raises: [CancelledError]).} =
## Delete a block from the blockstore
##
@ -130,7 +138,9 @@ method delBlock*(self: NetworkStore, cid: Cid): Future[?!void] =
{.pop.}
method hasBlock*(self: NetworkStore, cid: Cid): Future[?!bool] {.async.} =
method hasBlock*(
self: NetworkStore, cid: Cid
): Future[?!bool] {.async: (raises: [CancelledError]).} =
## Check if the block exists in the blockstore
##
@ -139,13 +149,13 @@ method hasBlock*(self: NetworkStore, cid: Cid): Future[?!bool] {.async.} =
method hasBlock*(
self: NetworkStore, tree: Cid, index: Natural
): Future[?!bool] {.async.} =
): Future[?!bool] {.async: (raises: [CancelledError]).} =
## Check if the block exists in the blockstore
##
trace "Checking network store for block existence", tree, index
return await self.localStore.hasBlock(tree, index)
method close*(self: NetworkStore): Future[void] {.async.} =
method close*(self: NetworkStore): Future[void] {.async: (raises: []).} =
## Close the underlying local blockstore
##

View File

@ -5,12 +5,15 @@ import pkg/chronicles
import pkg/datastore/typedds
import ../utils/asynciter
import ../utils/safeasynciter
{.push raises: [].}
type KeyVal*[T] = tuple[key: Key, value: T]
proc toAsyncIter*[T](
queryIter: QueryIter[T], finishOnErr: bool = true
): Future[?!AsyncIter[?!QueryResponse[T]]] {.async.} =
): Future[?!AsyncIter[?!QueryResponse[T]]] {.async: (raises: [CancelledError]).} =
## Converts `QueryIter[T]` to `AsyncIter[?!QueryResponse[T]]` and automatically
## runs dispose whenever `QueryIter` finishes or whenever an error occurs (only
## if the flag finishOnErr is set to true)
@ -42,9 +45,43 @@ proc toAsyncIter*[T](
AsyncIter[?!QueryResponse[T]].new(genNext, isFinished).success
proc toSafeAsyncIter*[T](
queryIter: QueryIter[T], finishOnErr: bool = true
): Future[?!SafeAsyncIter[QueryResponse[T]]] {.async: (raises: [CancelledError]).} =
## Converts `QueryIter[T]` to `SafeAsyncIter[QueryResponse[T]]` and automatically
## runs dispose whenever `QueryIter` finishes or whenever an error occurs (only
## if the flag finishOnErr is set to true)
##
if queryIter.finished:
trace "Disposing iterator"
if error =? (await queryIter.dispose()).errorOption:
return failure(error)
return success(SafeAsyncIter[QueryResponse[T]].empty())
var errOccurred = false
proc genNext(): Future[?!QueryResponse[T]] {.async: (raises: [CancelledError]).} =
let queryResOrErr = await queryIter.next()
if queryResOrErr.isErr:
errOccurred = true
if queryIter.finished or (errOccurred and finishOnErr):
trace "Disposing iterator"
if error =? (await queryIter.dispose()).errorOption:
return failure(error)
return queryResOrErr
proc isFinished(): bool =
queryIter.finished
SafeAsyncIter[QueryResponse[T]].new(genNext, isFinished).success
proc filterSuccess*[T](
iter: AsyncIter[?!QueryResponse[T]]
): Future[AsyncIter[tuple[key: Key, value: T]]] {.async.} =
): Future[AsyncIter[tuple[key: Key, value: T]]] {.async: (raises: [CancelledError]).} =
## Filters out any items that are not success
proc mapping(resOrErr: ?!QueryResponse[T]): Future[?KeyVal[T]] {.async.} =
@ -63,3 +100,29 @@ proc filterSuccess*[T](
(key: key, value: value).some
await mapFilter[?!QueryResponse[T], KeyVal[T]](iter, mapping)
proc filterSuccess*[T](
iter: SafeAsyncIter[QueryResponse[T]]
): Future[SafeAsyncIter[tuple[key: Key, value: T]]] {.
async: (raises: [CancelledError])
.} =
## Filters out any items that are not success
proc mapping(
resOrErr: ?!QueryResponse[T]
): Future[Option[?!KeyVal[T]]] {.async: (raises: [CancelledError]).} =
without res =? resOrErr, error:
error "Error occurred when getting QueryResponse", msg = error.msg
return Result[KeyVal[T], ref CatchableError].none
without key =? res.key:
warn "No key for a QueryResponse"
return Result[KeyVal[T], ref CatchableError].none
without value =? res.value, error:
error "Error occurred when getting a value from QueryResponse", msg = error.msg
return Result[KeyVal[T], ref CatchableError].none
some(success((key: key, value: value)))
await mapFilter[QueryResponse[T], KeyVal[T]](iter, mapping)

View File

@ -34,7 +34,7 @@ declareGauge(codex_repostore_bytes_reserved, "codex repostore bytes reserved")
proc putLeafMetadata*(
self: RepoStore, treeCid: Cid, index: Natural, blkCid: Cid, proof: CodexProof
): Future[?!StoreResultKind] {.async.} =
): Future[?!StoreResultKind] {.async: (raises: [CancelledError]).} =
without key =? createBlockCidAndProofMetadataKey(treeCid, index), err:
return failure(err)
@ -59,7 +59,7 @@ proc putLeafMetadata*(
proc delLeafMetadata*(
self: RepoStore, treeCid: Cid, index: Natural
): Future[?!void] {.async.} =
): Future[?!void] {.async: (raises: [CancelledError]).} =
without key =? createBlockCidAndProofMetadataKey(treeCid, index), err:
return failure(err)
@ -70,7 +70,7 @@ proc delLeafMetadata*(
proc getLeafMetadata*(
self: RepoStore, treeCid: Cid, index: Natural
): Future[?!LeafMetadata] {.async.} =
): Future[?!LeafMetadata] {.async: (raises: [CancelledError]).} =
without key =? createBlockCidAndProofMetadataKey(treeCid, index), err:
return failure(err)
@ -84,7 +84,7 @@ proc getLeafMetadata*(
proc updateTotalBlocksCount*(
self: RepoStore, plusCount: Natural = 0, minusCount: Natural = 0
): Future[?!void] {.async.} =
): Future[?!void] {.async: (raises: [CancelledError]).} =
await self.metaDs.modify(
CodexTotalBlocksKey,
proc(maybeCurrCount: ?Natural): Future[?Natural] {.async.} =
@ -139,7 +139,7 @@ proc updateBlockMetadata*(
plusRefCount: Natural = 0,
minusRefCount: Natural = 0,
minExpiry: SecondsSince1970 = 0,
): Future[?!void] {.async.} =
): Future[?!void] {.async: (raises: [CancelledError]).} =
if cid.isEmpty:
return success()
@ -163,7 +163,7 @@ proc updateBlockMetadata*(
proc storeBlock*(
self: RepoStore, blk: Block, minExpiry: SecondsSince1970
): Future[?!StoreResult] {.async.} =
): Future[?!StoreResult] {.async: (raises: [CancelledError]).} =
if blk.isEmpty:
return success(StoreResult(kind: AlreadyInStore))
@ -189,7 +189,7 @@ proc storeBlock*(
)
res = StoreResult(kind: AlreadyInStore)
# making sure that the block acutally is stored in the repoDs
# making sure that the block actually is stored in the repoDs
without hasBlock =? await self.repoDs.has(blkKey), err:
raise err
@ -215,7 +215,7 @@ proc storeBlock*(
proc tryDeleteBlock*(
self: RepoStore, cid: Cid, expiryLimit = SecondsSince1970.low
): Future[?!DeleteResult] {.async.} =
): Future[?!DeleteResult] {.async: (raises: [CancelledError]).} =
without metaKey =? createBlockExpirationMetadataKey(cid), err:
return failure(err)

View File

@ -7,6 +7,8 @@
## This file may not be copied, modified, or distributed except according to
## those terms.
{.push raises: [].}
import pkg/chronos
import pkg/chronos/futures
import pkg/datastore
@ -36,7 +38,9 @@ logScope:
# BlockStore API
###########################################################
method getBlock*(self: RepoStore, cid: Cid): Future[?!Block] {.async.} =
method getBlock*(
self: RepoStore, cid: Cid
): Future[?!Block] {.async: (raises: [CancelledError]).} =
## Get a block from the blockstore
##
@ -63,7 +67,7 @@ method getBlock*(self: RepoStore, cid: Cid): Future[?!Block] {.async.} =
method getBlockAndProof*(
self: RepoStore, treeCid: Cid, index: Natural
): Future[?!(Block, CodexProof)] {.async.} =
): Future[?!(Block, CodexProof)] {.async: (raises: [CancelledError]).} =
without leafMd =? await self.getLeafMetadata(treeCid, index), err:
return failure(err)
@ -74,13 +78,15 @@ method getBlockAndProof*(
method getBlock*(
self: RepoStore, treeCid: Cid, index: Natural
): Future[?!Block] {.async.} =
): Future[?!Block] {.async: (raises: [CancelledError]).} =
without leafMd =? await self.getLeafMetadata(treeCid, index), err:
return failure(err)
await self.getBlock(leafMd.blkCid)
method getBlock*(self: RepoStore, address: BlockAddress): Future[?!Block] =
method getBlock*(
self: RepoStore, address: BlockAddress
): Future[?!Block] {.async: (raw: true, raises: [CancelledError]).} =
## Get a block from the blockstore
##
@ -91,7 +97,7 @@ method getBlock*(self: RepoStore, address: BlockAddress): Future[?!Block] =
method ensureExpiry*(
self: RepoStore, cid: Cid, expiry: SecondsSince1970
): Future[?!void] {.async.} =
): Future[?!void] {.async: (raises: [CancelledError]).} =
## Ensure that block's associated expiry is at least given timestamp
## If the current expiry is lower then it is updated to the given one, otherwise it is left intact
##
@ -104,7 +110,7 @@ method ensureExpiry*(
method ensureExpiry*(
self: RepoStore, treeCid: Cid, index: Natural, expiry: SecondsSince1970
): Future[?!void] {.async.} =
): Future[?!void] {.async: (raises: [CancelledError]).} =
## Ensure that block's associated expiry is at least given timestamp
## If the current expiry is lower then it is updated to the given one, otherwise it is left intact
##
@ -116,7 +122,7 @@ method ensureExpiry*(
method putCidAndProof*(
self: RepoStore, treeCid: Cid, index: Natural, blkCid: Cid, proof: CodexProof
): Future[?!void] {.async.} =
): Future[?!void] {.async: (raises: [CancelledError]).} =
## Put a block to the blockstore
##
@ -142,13 +148,15 @@ method putCidAndProof*(
method getCidAndProof*(
self: RepoStore, treeCid: Cid, index: Natural
): Future[?!(Cid, CodexProof)] {.async.} =
): Future[?!(Cid, CodexProof)] {.async: (raises: [CancelledError]).} =
without leafMd =? await self.getLeafMetadata(treeCid, index), err:
return failure(err)
success((leafMd.blkCid, leafMd.proof))
method getCid*(self: RepoStore, treeCid: Cid, index: Natural): Future[?!Cid] {.async.} =
method getCid*(
self: RepoStore, treeCid: Cid, index: Natural
): Future[?!Cid] {.async: (raises: [CancelledError]).} =
without leafMd =? await self.getLeafMetadata(treeCid, index), err:
return failure(err)
@ -156,7 +164,7 @@ method getCid*(self: RepoStore, treeCid: Cid, index: Natural): Future[?!Cid] {.a
method putBlock*(
self: RepoStore, blk: Block, ttl = Duration.none
): Future[?!void] {.async.} =
): Future[?!void] {.async: (raises: [CancelledError]).} =
## Put a block to the blockstore
##
@ -186,7 +194,9 @@ method putBlock*(
return success()
proc delBlockInternal(self: RepoStore, cid: Cid): Future[?!DeleteResultKind] {.async.} =
proc delBlockInternal(
self: RepoStore, cid: Cid
): Future[?!DeleteResultKind] {.async: (raises: [CancelledError]).} =
logScope:
cid = cid
@ -208,7 +218,9 @@ proc delBlockInternal(self: RepoStore, cid: Cid): Future[?!DeleteResultKind] {.a
success(res.kind)
method delBlock*(self: RepoStore, cid: Cid): Future[?!void] {.async.} =
method delBlock*(
self: RepoStore, cid: Cid
): Future[?!void] {.async: (raises: [CancelledError]).} =
## Delete a block from the blockstore when block refCount is 0 or block is expired
##
@ -230,7 +242,7 @@ method delBlock*(self: RepoStore, cid: Cid): Future[?!void] {.async.} =
method delBlock*(
self: RepoStore, treeCid: Cid, index: Natural
): Future[?!void] {.async.} =
): Future[?!void] {.async: (raises: [CancelledError]).} =
without leafMd =? await self.getLeafMetadata(treeCid, index), err:
if err of BlockNotFoundError:
return success()
@ -251,7 +263,9 @@ method delBlock*(
success()
method hasBlock*(self: RepoStore, cid: Cid): Future[?!bool] {.async.} =
method hasBlock*(
self: RepoStore, cid: Cid
): Future[?!bool] {.async: (raises: [CancelledError]).} =
## Check if the block exists in the blockstore
##
@ -270,7 +284,7 @@ method hasBlock*(self: RepoStore, cid: Cid): Future[?!bool] {.async.} =
method hasBlock*(
self: RepoStore, treeCid: Cid, index: Natural
): Future[?!bool] {.async.} =
): Future[?!bool] {.async: (raises: [CancelledError]).} =
without leafMd =? await self.getLeafMetadata(treeCid, index), err:
if err of BlockNotFoundError:
return success(false)
@ -281,12 +295,12 @@ method hasBlock*(
method listBlocks*(
self: RepoStore, blockType = BlockType.Manifest
): Future[?!AsyncIter[?Cid]] {.async.} =
): Future[?!SafeAsyncIter[Cid]] {.async: (raises: [CancelledError]).} =
## Get the list of blocks in the RepoStore.
## This is an intensive operation
##
var iter = AsyncIter[?Cid]()
var iter = SafeAsyncIter[Cid]()
let key =
case blockType
@ -299,7 +313,7 @@ method listBlocks*(
trace "Error querying cids in repo", blockType, err = err.msg
return failure(err)
proc next(): Future[?Cid] {.async.} =
proc next(): Future[?!Cid] {.async: (raises: [CancelledError]).} =
await idleAsync()
if queryIter.finished:
iter.finish
@ -307,9 +321,9 @@ method listBlocks*(
if pair =? (await queryIter.next()) and cid =? pair.key:
doAssert pair.data.len == 0
trace "Retrieved record from repo", cid
return Cid.init(cid.value).option
return Cid.init(cid.value).mapFailure
else:
return Cid.none
return Cid.failure("No or invalid Cid")
iter.next = next
return success iter
@ -332,7 +346,9 @@ proc blockRefCount*(self: RepoStore, cid: Cid): Future[?!Natural] {.async.} =
method getBlockExpirations*(
self: RepoStore, maxNumber: int, offset: int
): Future[?!AsyncIter[BlockExpiration]] {.async, base.} =
): Future[?!SafeAsyncIter[BlockExpiration]] {.
async: (raises: [CancelledError]), base, gcsafe
.} =
## Get iterator with block expirations
##
@ -344,26 +360,30 @@ method getBlockExpirations*(
error "Unable to execute block expirations query", err = err.msg
return failure(err)
without asyncQueryIter =? await queryIter.toAsyncIter(), err:
without asyncQueryIter =? (await queryIter.toSafeAsyncIter()), err:
error "Unable to convert QueryIter to AsyncIter", err = err.msg
return failure(err)
let filteredIter: AsyncIter[KeyVal[BlockMetadata]] =
let filteredIter: SafeAsyncIter[KeyVal[BlockMetadata]] =
await asyncQueryIter.filterSuccess()
proc mapping(kv: KeyVal[BlockMetadata]): Future[?BlockExpiration] {.async.} =
proc mapping(
kvRes: ?!KeyVal[BlockMetadata]
): Future[Option[?!BlockExpiration]] {.async: (raises: [CancelledError]).} =
without kv =? kvRes, err:
error "Error occurred when getting KeyVal", err = err.msg
return Result[BlockExpiration, ref CatchableError].none
without cid =? Cid.init(kv.key.value).mapFailure, err:
error "Failed decoding cid", err = err.msg
return BlockExpiration.none
return Result[BlockExpiration, ref CatchableError].none
BlockExpiration(cid: cid, expiry: kv.value.expiry).some
some(success(BlockExpiration(cid: cid, expiry: kv.value.expiry)))
let blockExpIter =
await mapFilter[KeyVal[BlockMetadata], BlockExpiration](filteredIter, mapping)
success(blockExpIter)
method close*(self: RepoStore): Future[void] {.async.} =
method close*(self: RepoStore): Future[void] {.async: (raises: []).} =
## Close the blockstore, cleaning up resources managed by it.
## For some implementations this may be a no-op
##
@ -371,10 +391,13 @@ method close*(self: RepoStore): Future[void] {.async.} =
trace "Closing repostore"
if not self.metaDs.isNil:
(await self.metaDs.close()).expect("Should meta datastore")
try:
(await noCancel self.metaDs.close()).expect("Should meta datastore")
except CatchableError as err:
error "Failed to close meta datastore", err = err.msg
if not self.repoDs.isNil:
(await self.repoDs.close()).expect("Should repo datastore")
(await noCancel self.repoDs.close()).expect("Should repo datastore")
###########################################################
# RepoStore procs
@ -400,7 +423,9 @@ proc release*(
await self.updateQuotaUsage(minusReserved = bytes)
proc start*(self: RepoStore): Future[void] {.async.} =
proc start*(
self: RepoStore
): Future[void] {.async: (raises: [CancelledError, CodexError]).} =
## Start repo
##
@ -417,7 +442,7 @@ proc start*(self: RepoStore): Future[void] {.async.} =
self.started = true
proc stop*(self: RepoStore): Future[void] {.async.} =
proc stop*(self: RepoStore): Future[void] {.async: (raises: []).} =
## Stop repo
##
if not self.started:

View File

@ -125,7 +125,7 @@ method readOnce*(
return read
method closeImpl*(self: StoreStream) {.async.} =
method closeImpl*(self: StoreStream) {.async: (raises: []).} =
trace "Closing StoreStream"
self.offset = self.size # set Eof
await procCall LPStream(self).closeImpl()

View File

@ -8,6 +8,8 @@
## those terms.
##
{.push raises: [].}
import std/enumerate
import std/parseutils
import std/options
@ -17,8 +19,9 @@ import pkg/chronos
import ./utils/asyncheapqueue
import ./utils/fileutils
import ./utils/asynciter
import ./utils/safeasynciter
export asyncheapqueue, fileutils, asynciter, chronos
export asyncheapqueue, fileutils, asynciter, safeasynciter, chronos
when defined(posix):
import os, posix

View File

@ -123,10 +123,10 @@ proc map*[T, U](iter: AsyncIter[T], fn: Function[T, Future[U]]): AsyncIter[U] =
proc mapFilter*[T, U](
iter: AsyncIter[T], mapPredicate: Function[T, Future[Option[U]]]
): Future[AsyncIter[U]] {.async.} =
): Future[AsyncIter[U]] {.async: (raises: [CancelledError]).} =
var nextFutU: Option[Future[U]]
proc tryFetch(): Future[void] {.async.} =
proc tryFetch(): Future[void] {.async: (raises: [CancelledError]).} =
nextFutU = Future[U].none
while not iter.finished:
let futT = iter.next()
@ -157,7 +157,7 @@ proc mapFilter*[T, U](
proc filter*[T](
iter: AsyncIter[T], predicate: Function[T, Future[bool]]
): Future[AsyncIter[T]] {.async.} =
): Future[AsyncIter[T]] {.async: (raises: [CancelledError]).} =
proc wrappedPredicate(t: T): Future[Option[T]] {.async.} =
if await predicate(t):
some(t)

View File

@ -0,0 +1,234 @@
## Nim-Codex
## Copyright (c) 2025 Status Research & Development GmbH
## Licensed under either of
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
## at your option.
## This file may not be copied, modified, or distributed except according to
## those terms.
{.push raises: [].}
import std/sugar
import pkg/questionable
import pkg/questionable/results
import pkg/chronos
import ./iter
## SafeAsyncIter[T] is similar to `AsyncIter[Future[T]]`
## but does not throw exceptions others than CancelledError.
## It is thus way easier to use with checked exceptions
##
##
## Public interface:
##
## Attributes
## - next - allows to set a custom function to be called when the next item is requested
##
## Operations:
## - new - to create a new async iterator (SafeAsyncIter)
## - finish - to finish the async iterator
## - finished - to check if the async iterator is finished
## - next - to get the next item from the async iterator
## - items - to iterate over the async iterator
## - pairs - to iterate over the async iterator and return the index of each item
## - mapAsync - to convert a regular sync iterator (Iter) to an async iter (SafeAsyncIter)
## - map - to convert one async iterator (SafeAsyncIter) to another async iter (SafeAsyncIter)
## - mapFilter - to convert one async iterator (SafeAsyncIter) to another async iter (SafeAsyncIter) and apply filtering at the same time
## - filter - to filter an async iterator (SafeAsyncIter) returning another async iterator (SafeAsyncIter)
## - delayBy - to delay each item returned by async iter by a given duration
## - empty - to create an empty async iterator (SafeAsyncIter)
type
SafeFunction[T, U] =
proc(fut: T): Future[U] {.async: (raises: [CancelledError]), gcsafe, closure.}
SafeIsFinished = proc(): bool {.raises: [], gcsafe, closure.}
SafeGenNext[T] = proc(): Future[T] {.async: (raises: [CancelledError]), gcsafe.}
SafeAsyncIter*[T] = ref object
finished: bool
next*: SafeGenNext[?!T]
proc flatMap[T, U](
fut: auto, fn: SafeFunction[?!T, ?!U]
): Future[?!U] {.async: (raises: [CancelledError]).} =
let t = await fut
await fn(t)
proc flatMap[T, U](
fut: auto, fn: SafeFunction[?!T, Option[?!U]]
): Future[Option[?!U]] {.async: (raises: [CancelledError]).} =
let t = await fut
await fn(t)
########################################################################
## SafeAsyncIter public interface methods
########################################################################
proc new*[T](
_: type SafeAsyncIter[T],
genNext: SafeGenNext[?!T],
isFinished: IsFinished,
finishOnErr: bool = true,
): SafeAsyncIter[T] =
## Creates a new Iter using elements returned by supplier function `genNext`.
## Iter is finished whenever `isFinished` returns true.
##
var iter = SafeAsyncIter[T]()
proc next(): Future[?!T] {.async: (raises: [CancelledError]).} =
try:
if not iter.finished:
let item = await genNext()
if finishOnErr and err =? item.errorOption:
iter.finished = true
return failure(err)
if isFinished():
iter.finished = true
return item
else:
return failure("SafeAsyncIter is finished but next item was requested")
except CancelledError as err:
iter.finished = true
raise err
if isFinished():
iter.finished = true
iter.next = next
return iter
# forward declaration
proc mapAsync*[T, U](
iter: Iter[T], fn: SafeFunction[T, ?!U], finishOnErr: bool = true
): SafeAsyncIter[U]
proc new*[U, V: Ordinal](
_: type SafeAsyncIter[U], slice: HSlice[U, V], finishOnErr: bool = true
): SafeAsyncIter[U] =
## Creates new Iter from a slice
##
let iter = Iter[U].new(slice)
mapAsync[U, U](
iter,
proc(i: U): Future[?!U] {.async: (raises: [CancelledError]).} =
success[U](i),
finishOnErr = finishOnErr,
)
proc new*[U, V, S: Ordinal](
_: type SafeAsyncIter[U], a: U, b: V, step: S = 1, finishOnErr: bool = true
): SafeAsyncIter[U] =
## Creates new Iter in range a..b with specified step (default 1)
##
let iter = Iter[U].new(a, b, step)
mapAsync[U, U](
iter,
proc(i: U): Future[?!U] {.async: (raises: [CancelledError]).} =
U.success(i),
finishOnErr = finishOnErr,
)
proc finish*[T](self: SafeAsyncIter[T]): void =
self.finished = true
proc finished*[T](self: SafeAsyncIter[T]): bool =
self.finished
iterator items*[T](self: SafeAsyncIter[T]): auto {.inline.} =
while not self.finished:
yield self.next()
iterator pairs*[T](self: SafeAsyncIter[T]): auto {.inline.} =
var i = 0
while not self.finished:
yield (i, self.next())
inc(i)
proc mapAsync*[T, U](
iter: Iter[T], fn: SafeFunction[T, ?!U], finishOnErr: bool = true
): SafeAsyncIter[U] =
SafeAsyncIter[U].new(
genNext = () => fn(iter.next()),
isFinished = () => iter.finished(),
finishOnErr = finishOnErr,
)
proc map*[T, U](
iter: SafeAsyncIter[T], fn: SafeFunction[?!T, ?!U], finishOnErr: bool = true
): SafeAsyncIter[U] =
SafeAsyncIter[U].new(
genNext = () => iter.next().flatMap(fn),
isFinished = () => iter.finished,
finishOnErr = finishOnErr,
)
proc mapFilter*[T, U](
iter: SafeAsyncIter[T],
mapPredicate: SafeFunction[?!T, Option[?!U]],
finishOnErr: bool = true,
): Future[SafeAsyncIter[U]] {.async: (raises: [CancelledError]).} =
var nextU: Option[?!U]
proc filter(): Future[void] {.async: (raises: [CancelledError]).} =
nextU = none(?!U)
while not iter.finished:
let futT = iter.next()
if mappedValue =? await futT.flatMap(mapPredicate):
nextU = some(mappedValue)
break
proc genNext(): Future[?!U] {.async: (raises: [CancelledError]).} =
let u = nextU.unsafeGet
await filter()
u
proc isFinished(): bool =
nextU.isNone
await filter()
SafeAsyncIter[U].new(genNext, isFinished, finishOnErr = finishOnErr)
proc filter*[T](
iter: SafeAsyncIter[T], predicate: SafeFunction[?!T, bool], finishOnErr: bool = true
): Future[SafeAsyncIter[T]] {.async: (raises: [CancelledError]).} =
proc wrappedPredicate(
t: ?!T
): Future[Option[?!T]] {.async: (raises: [CancelledError]).} =
if await predicate(t):
some(t)
else:
none(?!T)
await mapFilter[T, T](iter, wrappedPredicate, finishOnErr = finishOnErr)
proc delayBy*[T](
iter: SafeAsyncIter[T], d: Duration, finishOnErr: bool = true
): SafeAsyncIter[T] =
## Delays emitting each item by given duration
##
map[T, T](
iter,
proc(t: ?!T): Future[?!T] {.async: (raises: [CancelledError]).} =
await sleepAsync(d)
return t,
finishOnErr = finishOnErr,
)
proc empty*[T](_: type SafeAsyncIter[T]): SafeAsyncIter[T] =
## Creates an empty SafeAsyncIter
##
proc genNext(): Future[?!T] {.async: (raises: [CancelledError]).} =
T.failure("Next item requested from an empty SafeAsyncIter")
proc isFinished(): bool =
true
SafeAsyncIter[T].new(genNext, isFinished)

View File

@ -10,17 +10,14 @@
## Timer
## Used to execute a callback in a loop
import pkg/upraises
push:
{.upraises: [].}
{.push raises: [].}
import pkg/chronos
import ../logutils
type
TimerCallback* = proc(): Future[void] {.gcsafe, upraises: [].}
TimerCallback* = proc(): Future[void] {.gcsafe, async: (raises: []).}
Timer* = ref object of RootObj
callback: TimerCallback
interval: Duration
@ -38,8 +35,6 @@ proc timerLoop(timer: Timer) {.async: (raises: []).} =
await sleepAsync(timer.interval)
except CancelledError:
discard # do not propagate as timerLoop is asyncSpawned
except CatchableError as exc:
error "Timer caught unhandled exception: ", name = timer.name, msg = exc.msg
method start*(
timer: Timer, callback: TimerCallback, interval: Duration
@ -51,7 +46,7 @@ method start*(
timer.interval = interval
timer.loopFuture = timerLoop(timer)
method stop*(timer: Timer) {.async, base.} =
method stop*(timer: Timer) {.base, async: (raises: []).} =
if timer.loopFuture != nil and not timer.loopFuture.finished:
trace "Timer stopping: ", name = timer.name
await timer.loopFuture.cancelAndWait()

View File

@ -15,6 +15,7 @@ import pkg/questionable/results
import pkg/codex/stores/repostore
import pkg/codex/utils/asynciter
import pkg/codex/utils/safeasynciter
type MockRepoStore* = ref object of RepoStore
delBlockCids*: seq[Cid]
@ -22,19 +23,17 @@ type MockRepoStore* = ref object of RepoStore
getBeOffset*: int
testBlockExpirations*: seq[BlockExpiration]
getBlockExpirationsThrows*: bool
method delBlock*(self: MockRepoStore, cid: Cid): Future[?!void] {.async.} =
method delBlock*(
self: MockRepoStore, cid: Cid
): Future[?!void] {.async: (raises: [CancelledError]).} =
self.delBlockCids.add(cid)
self.testBlockExpirations = self.testBlockExpirations.filterIt(it.cid != cid)
return success()
method getBlockExpirations*(
self: MockRepoStore, maxNumber: int, offset: int
): Future[?!AsyncIter[BlockExpiration]] {.async.} =
if self.getBlockExpirationsThrows:
raise new CatchableError
): Future[?!SafeAsyncIter[BlockExpiration]] {.async: (raises: [CancelledError]).} =
self.getBeMaxNumber = maxNumber
self.getBeOffset = offset
@ -43,11 +42,13 @@ method getBlockExpirations*(
limit = min(offset + maxNumber, len(testBlockExpirationsCpy))
let
iter1 = AsyncIter[int].new(offset ..< limit)
iter1 = SafeAsyncIter[int].new(offset ..< limit)
iter2 = map[int, BlockExpiration](
iter1,
proc(i: int): Future[BlockExpiration] {.async.} =
testBlockExpirationsCpy[i],
proc(i: ?!int): Future[?!BlockExpiration] {.async: (raises: [CancelledError]).} =
if i =? i:
return success(testBlockExpirationsCpy[i])
return failure("Unexpected error!"),
)
success(iter2)

View File

@ -26,7 +26,7 @@ method start*(mockTimer: MockTimer, callback: timer.TimerCallback, interval: Dur
mockTimer.mockInterval = interval
inc mockTimer.startCalled
method stop*(mockTimer: MockTimer) {.async.} =
method stop*(mockTimer: MockTimer) {.async: (raises: []).} =
inc mockTimer.stopCalled
method invokeCallback*(mockTimer: MockTimer) {.async, base.} =

View File

@ -120,7 +120,9 @@ asyncchecksuite "Test Node - Host contracts":
(getTime() + DefaultBlockTtl.toTimesDuration + 1.hours).toUnix.uint64
var fetchedBytes: uint = 0
let onBlocks = proc(blocks: seq[bt.Block]): Future[?!void] {.async.} =
let onBlocks = proc(
blocks: seq[bt.Block]
): Future[?!void] {.async: (raises: [CancelledError]).} =
for blk in blocks:
fetchedBytes += blk.data.len.uint
return success()

View File

@ -73,7 +73,9 @@ asyncchecksuite "Test Node - Basic":
await node.fetchBatched(
manifest,
batchSize = batchSize,
proc(blocks: seq[bt.Block]): Future[?!void] {.gcsafe, async.} =
proc(
blocks: seq[bt.Block]
): Future[?!void] {.gcsafe, async: (raises: [CancelledError]).} =
check blocks.len > 0 and blocks.len <= batchSize
return success(),
)
@ -96,7 +98,9 @@ asyncchecksuite "Test Node - Basic":
await node.fetchBatched(
manifest,
batchSize = batchSize,
proc(blocks: seq[bt.Block]): Future[?!void] {.gcsafe, async.} =
proc(
blocks: seq[bt.Block]
): Future[?!void] {.gcsafe, async: (raises: [CancelledError]).} =
return failure("Should not be called"),
)
)

View File

@ -37,7 +37,7 @@ suite "sales state 'filled'":
onExpiryUpdatePassedExpiry = -1
let onExpiryUpdate = proc(
rootCid: Cid, expiry: SecondsSince1970
): Future[?!void] {.async.} =
): Future[?!void] {.async: (raises: [CancelledError]).} =
onExpiryUpdatePassedExpiry = expiry
return success()
let context = SalesContext(market: market, onExpiryUpdate: some onExpiryUpdate)

View File

@ -31,7 +31,7 @@ asyncchecksuite "sales state 'initialproving'":
setup:
let onProve = proc(
slot: Slot, challenge: ProofChallenge
): Future[?!Groth16Proof] {.async.} =
): Future[?!Groth16Proof] {.async: (raises: [CancelledError]).} =
receivedChallenge = challenge
return success(proof)
let context = SalesContext(onProve: onProve.some, market: market, clock: clock)
@ -88,7 +88,7 @@ asyncchecksuite "sales state 'initialproving'":
test "switches to errored state when onProve callback fails":
let onProveFailed: OnProve = proc(
slot: Slot, challenge: ProofChallenge
): Future[?!Groth16Proof] {.async.} =
): Future[?!Groth16Proof] {.async: (raises: [CancelledError]).} =
return failure("oh no!")
let proofFailedContext =

View File

@ -31,7 +31,7 @@ asyncchecksuite "sales state 'proving'":
market = MockMarket.new()
let onProve = proc(
slot: Slot, challenge: ProofChallenge
): Future[?!Groth16Proof] {.async.} =
): Future[?!Groth16Proof] {.async: (raises: [CancelledError]).} =
receivedChallenge = challenge
return success(proof)
let context = SalesContext(market: market, clock: clock, onProve: onProve.some)

View File

@ -44,7 +44,7 @@ asyncchecksuite "sales state 'simulated-proving'":
let onProve = proc(
slot: Slot, challenge: ProofChallenge
): Future[?!Groth16Proof] {.async.} =
): Future[?!Groth16Proof] {.async: (raises: [CancelledError]).} =
return success(proof)
let context = SalesContext(market: market, clock: clock, onProve: onProve.some)
agent = newSalesAgent(context, request.id, slot.slotIndex, request.some)

View File

@ -64,18 +64,18 @@ asyncchecksuite "Sales - start":
reservations = sales.context.reservations
sales.onStore = proc(
request: StorageRequest, slot: uint64, onBatch: BatchProc, isRepairing = false
): Future[?!void] {.async.} =
): Future[?!void] {.async: (raises: [CancelledError]).} =
return success()
sales.onExpiryUpdate = proc(
rootCid: Cid, expiry: SecondsSince1970
): Future[?!void] {.async.} =
): Future[?!void] {.async: (raises: [CancelledError]).} =
return success()
queue = sales.context.slotQueue
sales.onProve = proc(
slot: Slot, challenge: ProofChallenge
): Future[?!Groth16Proof] {.async.} =
): Future[?!Groth16Proof] {.async: (raises: [CancelledError]).} =
return success(proof)
itemsProcessed = @[]
expiry = (clock.now() + 42)
@ -185,18 +185,18 @@ asyncchecksuite "Sales":
reservations = sales.context.reservations
sales.onStore = proc(
request: StorageRequest, slot: uint64, onBatch: BatchProc, isRepairing = false
): Future[?!void] {.async.} =
): Future[?!void] {.async: (raises: [CancelledError]).} =
return success()
sales.onExpiryUpdate = proc(
rootCid: Cid, expiry: SecondsSince1970
): Future[?!void] {.async.} =
): Future[?!void] {.async: (raises: [CancelledError]).} =
return success()
queue = sales.context.slotQueue
sales.onProve = proc(
slot: Slot, challenge: ProofChallenge
): Future[?!Groth16Proof] {.async.} =
): Future[?!Groth16Proof] {.async: (raises: [CancelledError]).} =
return success(proof)
await sales.start()
itemsProcessed = @[]
@ -362,7 +362,7 @@ asyncchecksuite "Sales":
test "availability size is reduced by request slot size when fully downloaded":
sales.onStore = proc(
request: StorageRequest, slot: uint64, onBatch: BatchProc, isRepairing = false
): Future[?!void] {.async.} =
): Future[?!void] {.async: (raises: [CancelledError]).} =
let blk = bt.Block.new(@[1.byte]).get
await onBatch(blk.repeat(request.ask.slotSize.int))
@ -375,7 +375,7 @@ asyncchecksuite "Sales":
var slotIndex = 0.uint64
sales.onStore = proc(
request: StorageRequest, slot: uint64, onBatch: BatchProc, isRepairing = false
): Future[?!void] {.async.} =
): Future[?!void] {.async: (raises: [CancelledError]).} =
slotIndex = slot
let blk = bt.Block.new(@[1.byte]).get
await onBatch(blk.repeat(request.ask.slotSize))
@ -450,7 +450,7 @@ asyncchecksuite "Sales":
var storingRequest: StorageRequest
sales.onStore = proc(
request: StorageRequest, slot: uint64, onBatch: BatchProc, isRepairing = false
): Future[?!void] {.async.} =
): Future[?!void] {.async: (raises: [CancelledError]).} =
storingRequest = request
return success()
@ -463,7 +463,7 @@ asyncchecksuite "Sales":
var storingSlot: uint64
sales.onStore = proc(
request: StorageRequest, slot: uint64, onBatch: BatchProc, isRepairing = false
): Future[?!void] {.async.} =
): Future[?!void] {.async: (raises: [CancelledError]).} =
storingRequest = request
storingSlot = slot
return success()
@ -476,7 +476,7 @@ asyncchecksuite "Sales":
let error = newException(IOError, "data retrieval failed")
sales.onStore = proc(
request: StorageRequest, slot: uint64, onBatch: BatchProc, isRepairing = false
): Future[?!void] {.async.} =
): Future[?!void] {.async: (raises: [CancelledError]).} =
return failure(error)
createAvailability()
await market.requestStorage(request)
@ -487,7 +487,7 @@ asyncchecksuite "Sales":
var provingSlot: uint64
sales.onProve = proc(
slot: Slot, challenge: ProofChallenge
): Future[?!Groth16Proof] {.async.} =
): Future[?!Groth16Proof] {.async: (raises: [CancelledError]).} =
provingRequest = slot.request
provingSlot = slot.slotIndex
return success(Groth16Proof.example)
@ -527,8 +527,8 @@ asyncchecksuite "Sales":
# which then calls the onClear callback
sales.onProve = proc(
slot: Slot, challenge: ProofChallenge
): Future[?!Groth16Proof] {.async.} =
raise newException(IOError, "proof failed")
): Future[?!Groth16Proof] {.async: (raises: [CancelledError]).} =
return failure("proof failed")
var clearedRequest: StorageRequest
var clearedSlotIndex: uint64
sales.onClear = proc(request: StorageRequest, slotIndex: uint64) =
@ -545,7 +545,7 @@ asyncchecksuite "Sales":
let otherHost = Address.example
sales.onStore = proc(
request: StorageRequest, slot: uint64, onBatch: BatchProc, isRepairing = false
): Future[?!void] {.async.} =
): Future[?!void] {.async: (raises: [CancelledError]).} =
await sleepAsync(chronos.hours(1))
return success()
createAvailability()
@ -561,7 +561,7 @@ asyncchecksuite "Sales":
let origSize = availability.freeSize
sales.onStore = proc(
request: StorageRequest, slot: uint64, onBatch: BatchProc, isRepairing = false
): Future[?!void] {.async.} =
): Future[?!void] {.async: (raises: [CancelledError]).} =
await sleepAsync(chronos.hours(1))
return success()
createAvailability()
@ -588,7 +588,7 @@ asyncchecksuite "Sales":
let origSize = availability.freeSize
sales.onStore = proc(
request: StorageRequest, slot: uint64, onBatch: BatchProc, isRepairing = false
): Future[?!void] {.async.} =
): Future[?!void] {.async: (raises: [CancelledError]).} =
await sleepAsync(chronos.hours(1))
return success()
createAvailability()

View File

@ -58,7 +58,7 @@ proc commonBlockStoreTests*(
test "putBlock raises onBlockStored":
var storedCid = Cid.example
proc onStored(cid: Cid) {.async.} =
proc onStored(cid: Cid) {.async: (raises: []).} =
storedCid = cid
store.onBlockStored = onStored.some()
@ -106,10 +106,10 @@ proc commonBlockStoreTests*(
check not handle.failed
check handle.read.isOk
let cids = (await store.listBlocks(blockType = BlockType.Block)).tryGet()
let cidsIter = (await store.listBlocks(blockType = BlockType.Block)).tryGet()
var count = 0
for c in cids:
for c in cidsIter:
if cid =? await c:
check (await store.hasBlock(cid)).tryGet()
count.inc
@ -130,11 +130,11 @@ proc commonBlockStoreTests*(
check not handle.failed
check handle.read.isOk
let cids = (await store.listBlocks(blockType = BlockType.Manifest)).tryGet()
let cidsIter = (await store.listBlocks(blockType = BlockType.Manifest)).tryGet()
var count = 0
for c in cids:
if cid =? (await c):
for c in cidsIter:
if cid =? await c:
check manifestBlock.cid == cid
check (await store.hasBlock(cid)).tryGet()
count.inc
@ -155,11 +155,11 @@ proc commonBlockStoreTests*(
check not handle.failed
check handle.read.isOk
let cids = (await store.listBlocks(blockType = BlockType.Both)).tryGet()
let cidsIter = (await store.listBlocks(blockType = BlockType.Both)).tryGet()
var count = 0
for c in cids:
if cid =? (await c):
for c in cidsIter:
if cid =? await c:
check (await store.hasBlock(cid)).tryGet()
count.inc

View File

@ -73,11 +73,6 @@ suite "BlockMaintainer":
mockRepoStore.getBeMaxNumber == 2
mockRepoStore.getBeOffset == 0
test "Timer callback should handle Catachable errors":
mockRepoStore.getBlockExpirationsThrows = true
blockMaintainer.start()
await mockTimer.invokeCallback()
test "Subsequent timer callback should call getBlockExpirations on RepoStore with offset":
blockMaintainer.start()
await mockTimer.invokeCallback()

View File

@ -15,7 +15,7 @@ import pkg/codex/stores
import pkg/codex/stores/repostore/operations
import pkg/codex/blocktype as bt
import pkg/codex/clock
import pkg/codex/utils/asynciter
import pkg/codex/utils/safeasynciter
import pkg/codex/merkletree/codex
import ../../asynctest
@ -181,9 +181,8 @@ asyncchecksuite "RepoStore":
var res = newSeq[BlockExpiration]()
for fut in iter:
let be = await fut
res.add(be)
if be =? (await fut):
res.add(be)
res
test "Should store block expiration timestamp":
@ -294,14 +293,14 @@ asyncchecksuite "RepoStore":
test "Should retrieve block expiration information":
proc unpack(
beIter: Future[?!AsyncIter[BlockExpiration]]
): Future[seq[BlockExpiration]] {.async.} =
beIter: auto
): Future[seq[BlockExpiration]] {.async: (raises: [CancelledError]).} =
var expirations = newSeq[BlockExpiration](0)
without iter =? (await beIter), err:
return expirations
for beFut in toSeq(iter):
let value = await beFut
expirations.add(value)
if value =? (await beFut):
expirations.add(value)
return expirations
let

View File

@ -2,6 +2,7 @@ import ./utils/testoptions
import ./utils/testkeyutils
import ./utils/testasyncstatemachine
import ./utils/testasynciter
import ./utils/testsafeasynciter
import ./utils/testtimer
import ./utils/testtrackedfutures

View File

@ -0,0 +1,417 @@
import std/sugar
import pkg/questionable
import pkg/chronos
import pkg/codex/utils/iter
import pkg/codex/utils/safeasynciter
import ../../asynctest
import ../helpers
asyncchecksuite "Test SafeAsyncIter":
test "Should be finished":
let iter = SafeAsyncIter[int].empty()
check:
iter.finished == true
test "using with async generator":
let value = 1
var intIter = Iter.new(0 ..< 5)
let expectedSeq = newSeqWith(5, intIter.next())
intIter = Iter.new(0 ..< 5)
proc asyncGen(): Future[?!int] {.async: (raw: true, raises: [CancelledError]).} =
let fut = newFuture[?!int]()
fut.complete(success(intIter.next()))
return fut
let iter = SafeAsyncIter[int].new(asyncGen, () => intIter.finished)
var collected: seq[int]
for iFut in iter:
let iRes = await iFut
if i =? iRes:
collected.add(i)
else:
fail()
check collected == expectedSeq
let nextRes = await iter.next()
assert nextRes.isFailure
check nextRes.error.msg == "SafeAsyncIter is finished but next item was requested"
test "getting async iter for simple sync range iterator":
let iter1 = SafeAsyncIter[int].new(0 ..< 5)
var collected: seq[int]
for iFut in iter1:
let iRes = await iFut
if i =? iRes:
collected.add(i)
else:
fail()
check:
collected == @[0, 1, 2, 3, 4]
test "Should map each item using `map`":
let iter1 = SafeAsyncIter[int].new(0 ..< 5).delayBy(10.millis)
let iter2 = map[int, string](
iter1,
proc(iRes: ?!int): Future[?!string] {.async: (raises: [CancelledError]).} =
if i =? iRes:
return success($i)
else:
return failure("Some error"),
)
var collected: seq[string]
for fut in iter2:
if i =? (await fut):
collected.add(i)
else:
fail()
check:
collected == @["0", "1", "2", "3", "4"]
test "Should leave only odd items using `filter`":
let
iter1 = SafeAsyncIter[int].new(0 ..< 5).delayBy(10.millis)
iter2 = await filter[int](
iter1,
proc(i: ?!int): Future[bool] {.async: (raises: [CancelledError]).} =
if i =? i:
return (i mod 2) == 1
else:
return false,
)
var collected: seq[int]
for fut in iter2:
if i =? (await fut):
collected.add(i)
else:
fail()
check:
collected == @[1, 3]
test "Should leave only odd items using `mapFilter`":
let
iter1 = SafeAsyncIter[int].new(0 ..< 5).delayBy(10.millis)
iter2 = await mapFilter[int, string](
iter1,
proc(i: ?!int): Future[Option[?!string]] {.async: (raises: [CancelledError]).} =
if i =? i:
if (i mod 2) == 1:
return some(success($i))
Result[system.string, ref CatchableError].none,
)
var collected: seq[string]
for fut in iter2:
if i =? (await fut):
collected.add(i)
else:
fail()
check:
collected == @["1", "3"]
test "Collecting errors on `map` when finish on error is true":
let
iter1 = SafeAsyncIter[int].new(0 ..< 5).delayBy(10.millis)
iter2 = map[int, string](
iter1,
proc(i: ?!int): Future[?!string] {.async: (raises: [CancelledError]).} =
if i =? i:
if i < 3:
return success($i)
else:
return failure("Error on item: " & $i)
return failure("Unexpected error"),
)
var collectedSuccess: seq[string]
var collectedFailure: seq[string]
for fut in iter2:
without i =? (await fut), err:
collectedFailure.add(err.msg)
continue
collectedSuccess.add(i)
check:
collectedSuccess == @["0", "1", "2"]
collectedFailure == @["Error on item: 3"]
iter2.finished
test "Collecting errors on `map` when finish on error is false":
let
iter1 = SafeAsyncIter[int].new(0 ..< 5).delayBy(10.millis)
iter2 = map[int, string](
iter1,
proc(i: ?!int): Future[?!string] {.async: (raises: [CancelledError]).} =
if i =? i:
if i < 3:
return success($i)
else:
return failure("Error on item: " & $i)
return failure("Unexpected error"),
finishOnErr = false,
)
var collectedSuccess: seq[string]
var collectedFailure: seq[string]
for fut in iter2:
without i =? (await fut), err:
collectedFailure.add(err.msg)
continue
collectedSuccess.add(i)
check:
collectedSuccess == @["0", "1", "2"]
collectedFailure == @["Error on item: 3", "Error on item: 4"]
iter2.finished
test "Collecting errors on `map` when errors are mixed with successes":
let
iter1 = SafeAsyncIter[int].new(0 ..< 5).delayBy(10.millis)
iter2 = map[int, string](
iter1,
proc(i: ?!int): Future[?!string] {.async: (raises: [CancelledError]).} =
if i =? i:
if i == 1 or i == 3:
return success($i)
else:
return failure("Error on item: " & $i)
return failure("Unexpected error"),
finishOnErr = false,
)
var collectedSuccess: seq[string]
var collectedFailure: seq[string]
for fut in iter2:
without i =? (await fut), err:
collectedFailure.add(err.msg)
continue
collectedSuccess.add(i)
check:
collectedSuccess == @["1", "3"]
collectedFailure == @["Error on item: 0", "Error on item: 2", "Error on item: 4"]
iter2.finished
test "Collecting errors on `mapFilter` when finish on error is true":
let
iter1 = SafeAsyncIter[int].new(0 ..< 5).delayBy(10.millis)
iter2 = await mapFilter[int, string](
iter1,
proc(i: ?!int): Future[Option[?!string]] {.async: (raises: [CancelledError]).} =
if i =? i:
if i == 1:
return some(string.failure("Error on item: " & $i))
elif i < 3:
return some(success($i))
else:
return Result[system.string, ref CatchableError].none
return some(string.failure("Unexpected error")),
)
var collectedSuccess: seq[string]
var collectedFailure: seq[string]
for fut in iter2:
without i =? (await fut), err:
collectedFailure.add(err.msg)
continue
collectedSuccess.add(i)
check:
collectedSuccess == @["0"]
collectedFailure == @["Error on item: 1"]
iter2.finished
test "Collecting errors on `mapFilter` when finish on error is false":
let
iter1 = SafeAsyncIter[int].new(0 ..< 5).delayBy(10.millis)
iter2 = await mapFilter[int, string](
iter1,
proc(i: ?!int): Future[Option[?!string]] {.async: (raises: [CancelledError]).} =
if i =? i:
if i == 1:
return some(string.failure("Error on item: " & $i))
elif i < 3:
return some(success($i))
else:
return Result[system.string, ref CatchableError].none
return some(string.failure("Unexpected error")),
finishOnErr = false,
)
var collectedSuccess: seq[string]
var collectedFailure: seq[string]
for fut in iter2:
without i =? (await fut), err:
collectedFailure.add(err.msg)
continue
collectedSuccess.add(i)
check:
collectedSuccess == @["0", "2"]
collectedFailure == @["Error on item: 1"]
iter2.finished
test "Collecting errors on `filter` when finish on error is false":
let
iter1 = SafeAsyncIter[int].new(0 ..< 5)
iter2 = map[int, string](
iter1,
proc(i: ?!int): Future[?!string] {.async: (raises: [CancelledError]).} =
if i =? i:
if i == 1 or i == 2:
return failure("Error on item: " & $i)
elif i < 4:
return success($i)
return failure("Unexpected error"),
finishOnErr = false,
)
iter3 = await filter[string](
iter2,
proc(i: ?!string): Future[bool] {.async: (raises: [CancelledError]).} =
without i =? i, err:
if err.msg == "Error on item: 1":
return false
else:
return true
if i == "0":
return false
else:
return true,
finishOnErr = false,
)
var collectedSuccess: seq[string]
var collectedFailure: seq[string]
for fut in iter3:
without i =? (await fut), err:
collectedFailure.add(err.msg)
continue
collectedSuccess.add(i)
check:
collectedSuccess == @["3"]
collectedFailure == @["Error on item: 2", "Unexpected error"]
iter3.finished
test "Collecting errors on `filter` when finish on error is true":
let
iter1 = SafeAsyncIter[int].new(0 ..< 5)
iter2 = map[int, string](
iter1,
proc(i: ?!int): Future[?!string] {.async: (raises: [CancelledError]).} =
if i =? i:
if i == 3:
return failure("Error on item: " & $i)
elif i < 3:
return success($i)
return failure("Unexpected error"),
finishOnErr = false,
)
iter3 = await filter[string](
iter2,
proc(i: ?!string): Future[bool] {.async: (raises: [CancelledError]).} =
without i =? i, err:
if err.msg == "Unexpected error":
return false
else:
return true
if i == "0":
return false
else:
return true,
)
var collectedSuccess: seq[string]
var collectedFailure: seq[string]
for fut in iter3:
without i =? (await fut), err:
collectedFailure.add(err.msg)
continue
collectedSuccess.add(i)
check:
collectedSuccess == @["1", "2"]
# On error iterator finishes and returns the error of the item
# that caused the error = that's why we see it here
collectedFailure == @["Error on item: 3"]
iter3.finished
test "Should propagate cancellation error immediately":
# This test can be a bit tricky to understand because it is
# quite tightly coupled with the way the iterator is implemented.
# When `mapFilter` is called, it already performs first iteration
# step: this is necessary, so that if there is nothing there left
# after filtering, the iterator state should be market as "finished"
# before event trying to call `next()` for the very first time (a standard
# practice is for the called to check if the iterator is finished before
# attempting to call `next()`). Thus, internally, the value that is to be
# returned for the first iteration is already resolved and ready to be returned.
# And this follows in the same for the next iterations. On calling `next()`
# the iterator first makes a temporary copy of the value already captured in
# the precious step, awaits for the next value (and if there is no more values
# to be returned it marks the iterator as finished), and then returns the
# local copy of the previously captured value.
# Now, to make sure that this mechanism works, and to document its
# cancellation semantics, this test shows that when the async predicate
# function is cancelled, this cancellation has immediate effect, which means
# that `next()` (or more precisely `getNext()` in `mapFilter` function), is
# interrupted immediately. If this is the case, the the iterator be interrupted
# before `next()` returns this locally captured value from the previous
# iteration and this is exactly the reason why at the end of the test
# we expect only values "0" and "1" to be collected while value "2" - although
# already resolved and ready to be returned, is not returned because of the
# cancellation of the async predicate function.
let fut: Future[Option[?!string]].Raising([CancelledError]) =
Future[Option[?!string]].Raising([CancelledError]).init("testsafeasynciter")
let iter1 = SafeAsyncIter[int].new(0 ..< 5).delayBy(10.millis)
let iter2 = await mapFilter[int, string](
iter1,
proc(i: ?!int): Future[Option[?!string]] {.async: (raises: [CancelledError]).} =
if i =? i:
if (i < 3):
return some(success($i))
return await fut,
)
proc cancelFut(): Future[void] {.async.} =
await sleepAsync(100.millis)
await fut.cancelAndWait()
asyncSpawn(cancelFut())
var collected: seq[string]
expect CancelledError:
for fut in iter2:
if i =? (await fut):
collected.add(i)
else:
fail()
check:
# We expect only values "0" and "1" to be collected
# and not value "2" that - although resolved and ready to be returned -
# will not be returned because of the cancellation.
collected == @["0", "1"]
iter2.finished

View File

@ -21,17 +21,14 @@ asyncchecksuite "Timer":
var numbersState = 0
var lettersState = 'a'
proc numbersCallback(): Future[void] {.async.} =
proc numbersCallback(): Future[void] {.async: (raises: []).} =
output &= $numbersState
inc numbersState
proc lettersCallback(): Future[void] {.async.} =
proc lettersCallback(): Future[void] {.async: (raises: []).} =
output &= $lettersState
inc lettersState
proc exceptionCallback(): Future[void] {.async.} =
raise newException(CatchableError, "Test Exception")
proc startNumbersTimer() =
timer1.start(numbersCallback, 10.milliseconds)
@ -73,11 +70,6 @@ asyncchecksuite "Timer":
await sleepAsync(30.milliseconds)
check output == stoppedOutput
test "Exceptions raised in timer callback are handled":
timer1.start(exceptionCallback, 10.milliseconds)
await sleepAsync(30.milliseconds)
await timer1.stop()
test "Starting both timers should execute callbacks sequentially":
startNumbersTimer()
startLettersTimer()