Fix/rework async exceptions (#1130)

* cleanup imports and logs

* add BlockHandle type

* revert deps

* refactor: async error handling and future tracking improvements

- Update async procedures to use explicit raises annotation
- Modify TrackedFutures to handle futures with no raised exceptions
- Replace `asyncSpawn` with explicit future tracking
- Update test suites to use `unittest2`
- Standardize error handling across network and async components
- Remove deprecated error handling patterns

This commit introduces a more robust approach to async error handling and future management, improving type safety and reducing potential runtime errors.

* bump nim-serde

* remove asyncSpawn

* rework background downloads and prefetch

* imporove logging

* refactor: enhance async procedures with error handling and raise annotations

* misc cleanup

* misc

* refactor: implement allFinishedFailed to aggregate future results with success and failure tracking

* refactor: update error handling in reader procedures to raise ChunkerError and CancelledError

* refactor: improve error handling in wantListHandler and accountHandler procedures

* refactor: simplify LPStreamReadError creation by consolidating parameters

* refactor: enhance error handling in AsyncStreamWrapper to catch unexpected errors

* refactor: enhance error handling in advertiser and discovery loops to improve resilience

* misc

* refactor: improve code structure and readability

* remove cancellation from addSlotToQueue

* refactor: add assertion for unexpected errors in local store checks

* refactor: prevent tracking of finished futures and improve test assertions

* refactor: improve error handling in local store checks

* remove usage of msgDetail

* feat: add initial implementation of discovery engine and related components

* refactor: improve task scheduling logic by removing unnecessary break statement

* break after scheduling a task

* make taskHandler cancelable

* refactor: update async handlers to raise CancelledError

* refactor(advertiser): streamline error handling and improve task flow in advertise loops

* fix: correct spelling of "divisible" in error messages and comments

* refactor(discovery): simplify discovery task loop and improve error handling

* refactor(engine): filter peers before processing in cancelBlocks procedure
This commit is contained in:
Dmitriy Ryajov 2025-03-13 08:33:15 -06:00 committed by GitHub
parent 2538ff8da3
commit 1cac3e2a11
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
74 changed files with 937 additions and 690 deletions

View File

@ -41,80 +41,86 @@ type Advertiser* = ref object of RootObj
advertiserRunning*: bool # Indicates if discovery is running
concurrentAdvReqs: int # Concurrent advertise requests
advertiseLocalStoreLoop*: Future[void] # Advertise loop task handle
advertiseLocalStoreLoop*: Future[void].Raising([]) # Advertise loop task handle
advertiseQueue*: AsyncQueue[Cid] # Advertise queue
trackedFutures*: TrackedFutures # Advertise tasks futures
advertiseLocalStoreLoopSleep: Duration # Advertise loop sleep
inFlightAdvReqs*: Table[Cid, Future[void]] # Inflight advertise requests
proc addCidToQueue(b: Advertiser, cid: Cid) {.async.} =
proc addCidToQueue(b: Advertiser, cid: Cid) {.async: (raises: [CancelledError]).} =
if cid notin b.advertiseQueue:
await b.advertiseQueue.put(cid)
trace "Advertising", cid
proc advertiseBlock(b: Advertiser, cid: Cid) {.async.} =
proc advertiseBlock(b: Advertiser, cid: Cid) {.async: (raises: [CancelledError]).} =
without isM =? cid.isManifest, err:
warn "Unable to determine if cid is manifest"
return
if isM:
without blk =? await b.localStore.getBlock(cid), err:
error "Error retrieving manifest block", cid, err = err.msg
return
try:
if isM:
without blk =? await b.localStore.getBlock(cid), err:
error "Error retrieving manifest block", cid, err = err.msg
return
without manifest =? Manifest.decode(blk), err:
error "Unable to decode as manifest", err = err.msg
return
without manifest =? Manifest.decode(blk), err:
error "Unable to decode as manifest", err = err.msg
return
# announce manifest cid and tree cid
await b.addCidToQueue(cid)
await b.addCidToQueue(manifest.treeCid)
# announce manifest cid and tree cid
await b.addCidToQueue(cid)
await b.addCidToQueue(manifest.treeCid)
except CancelledError as exc:
trace "Cancelled advertise block", cid
raise exc
except CatchableError as e:
error "failed to advertise block", cid, error = e.msgDetail
proc advertiseLocalStoreLoop(b: Advertiser) {.async: (raises: []).} =
while b.advertiserRunning:
try:
if cids =? await b.localStore.listBlocks(blockType = BlockType.Manifest):
trace "Advertiser begins iterating blocks..."
for c in cids:
if cid =? await c:
await b.advertiseBlock(cid)
trace "Advertiser iterating blocks finished."
try:
while b.advertiserRunning:
try:
if cids =? await b.localStore.listBlocks(blockType = BlockType.Manifest):
trace "Advertiser begins iterating blocks..."
for c in cids:
if cid =? await c:
await b.advertiseBlock(cid)
trace "Advertiser iterating blocks finished."
except CatchableError as e:
error "Error in advertise local store loop", error = e.msgDetail
raiseAssert("Unexpected exception in advertiseLocalStoreLoop")
await sleepAsync(b.advertiseLocalStoreLoopSleep)
except CancelledError:
break # do not propagate as advertiseLocalStoreLoop was asyncSpawned
except CatchableError as e:
error "failed to advertise blocks in local store", error = e.msgDetail
except CancelledError:
warn "Cancelled advertise local store loop"
info "Exiting advertise task loop"
proc processQueueLoop(b: Advertiser) {.async: (raises: []).} =
while b.advertiserRunning:
try:
try:
while b.advertiserRunning:
let cid = await b.advertiseQueue.get()
if cid in b.inFlightAdvReqs:
continue
try:
let request = b.discovery.provide(cid)
let request = b.discovery.provide(cid)
b.inFlightAdvReqs[cid] = request
codex_inflight_advertise.set(b.inFlightAdvReqs.len.int64)
b.inFlightAdvReqs[cid] = request
codex_inflight_advertise.set(b.inFlightAdvReqs.len.int64)
await request
finally:
defer:
b.inFlightAdvReqs.del(cid)
codex_inflight_advertise.set(b.inFlightAdvReqs.len.int64)
except CancelledError:
trace "Advertise task cancelled"
return
except CatchableError as exc:
warn "Exception in advertise task runner", exc = exc.msg
await request
except CancelledError:
warn "Cancelled advertise task runner"
info "Exiting advertise task runner"
proc start*(b: Advertiser) {.async.} =
proc start*(b: Advertiser) {.async: (raises: []).} =
## Start the advertiser
##
@ -134,13 +140,11 @@ proc start*(b: Advertiser) {.async.} =
for i in 0 ..< b.concurrentAdvReqs:
let fut = b.processQueueLoop()
b.trackedFutures.track(fut)
asyncSpawn fut
b.advertiseLocalStoreLoop = advertiseLocalStoreLoop(b)
b.trackedFutures.track(b.advertiseLocalStoreLoop)
asyncSpawn b.advertiseLocalStoreLoop
proc stop*(b: Advertiser) {.async.} =
proc stop*(b: Advertiser) {.async: (raises: []).} =
## Stop the advertiser
##

View File

@ -48,7 +48,7 @@ type DiscoveryEngine* = ref object of RootObj
pendingBlocks*: PendingBlocksManager # Blocks we're awaiting to be resolved
discEngineRunning*: bool # Indicates if discovery is running
concurrentDiscReqs: int # Concurrent discovery requests
discoveryLoop*: Future[void] # Discovery loop task handle
discoveryLoop*: Future[void].Raising([]) # Discovery loop task handle
discoveryQueue*: AsyncQueue[Cid] # Discovery queue
trackedFutures*: TrackedFutures # Tracked Discovery tasks futures
minPeersPerBlock*: int # Max number of peers with block
@ -57,30 +57,21 @@ type DiscoveryEngine* = ref object of RootObj
# Inflight discovery requests
proc discoveryQueueLoop(b: DiscoveryEngine) {.async: (raises: []).} =
while b.discEngineRunning:
for cid in toSeq(b.pendingBlocks.wantListBlockCids):
try:
try:
while b.discEngineRunning:
for cid in toSeq(b.pendingBlocks.wantListBlockCids):
await b.discoveryQueue.put(cid)
except CancelledError:
trace "Discovery loop cancelled"
return
except CatchableError as exc:
warn "Exception in discovery loop", exc = exc.msg
try:
logScope:
sleep = b.discoveryLoopSleep
wanted = b.pendingBlocks.len
await sleepAsync(b.discoveryLoopSleep)
except CancelledError:
discard # do not propagate as discoveryQueueLoop was asyncSpawned
except CancelledError:
trace "Discovery loop cancelled"
proc discoveryTaskLoop(b: DiscoveryEngine) {.async: (raises: []).} =
## Run discovery tasks
##
while b.discEngineRunning:
try:
try:
while b.discEngineRunning:
let cid = await b.discoveryQueue.get()
if cid in b.inFlightDiscReqs:
@ -90,35 +81,28 @@ proc discoveryTaskLoop(b: DiscoveryEngine) {.async: (raises: []).} =
let haves = b.peers.peersHave(cid)
if haves.len < b.minPeersPerBlock:
try:
let request = b.discovery.find(cid).wait(DefaultDiscoveryTimeout)
let request = b.discovery.find(cid)
b.inFlightDiscReqs[cid] = request
codex_inflight_discovery.set(b.inFlightDiscReqs.len.int64)
b.inFlightDiscReqs[cid] = request
defer:
b.inFlightDiscReqs.del(cid)
codex_inflight_discovery.set(b.inFlightDiscReqs.len.int64)
let peers = await request
if (await request.withTimeout(DefaultDiscoveryTimeout)) and
peers =? (await request).catch:
let dialed = await allFinished(peers.mapIt(b.network.dialPeer(it.data)))
for i, f in dialed:
if f.failed:
await b.discovery.removeProvider(peers[i].data.peerId)
finally:
b.inFlightDiscReqs.del(cid)
codex_inflight_discovery.set(b.inFlightDiscReqs.len.int64)
except CancelledError:
trace "Discovery task cancelled"
return
except CatchableError as exc:
warn "Exception in discovery task runner", exc = exc.msg
except Exception as e:
# Raised by b.discovery.removeProvider somehow...
# This should not be catchable, and we should never get here. Therefore,
# raise a Defect.
raiseAssert "Exception when removing provider"
except CancelledError:
trace "Discovery task cancelled"
return
info "Exiting discovery task runner"
proc queueFindBlocksReq*(b: DiscoveryEngine, cids: seq[Cid]) {.inline.} =
proc queueFindBlocksReq*(b: DiscoveryEngine, cids: seq[Cid]) =
for cid in cids:
if cid notin b.discoveryQueue:
try:
@ -126,11 +110,11 @@ proc queueFindBlocksReq*(b: DiscoveryEngine, cids: seq[Cid]) {.inline.} =
except CatchableError as exc:
warn "Exception queueing discovery request", exc = exc.msg
proc start*(b: DiscoveryEngine) {.async.} =
proc start*(b: DiscoveryEngine) {.async: (raises: []).} =
## Start the discengine task
##
trace "Discovery engine start"
trace "Discovery engine starting"
if b.discEngineRunning:
warn "Starting discovery engine twice"
@ -140,12 +124,13 @@ proc start*(b: DiscoveryEngine) {.async.} =
for i in 0 ..< b.concurrentDiscReqs:
let fut = b.discoveryTaskLoop()
b.trackedFutures.track(fut)
asyncSpawn fut
b.discoveryLoop = b.discoveryQueueLoop()
b.trackedFutures.track(b.discoveryLoop)
proc stop*(b: DiscoveryEngine) {.async.} =
trace "Discovery engine started"
proc stop*(b: DiscoveryEngine) {.async: (raises: []).} =
## Stop the discovery engine
##

View File

@ -93,12 +93,15 @@ type
price*: UInt256
# attach task scheduler to engine
proc scheduleTask(self: BlockExcEngine, task: BlockExcPeerCtx): bool {.gcsafe.} =
self.taskQueue.pushOrUpdateNoWait(task).isOk()
proc scheduleTask(self: BlockExcEngine, task: BlockExcPeerCtx) {.gcsafe, raises: [].} =
if self.taskQueue.pushOrUpdateNoWait(task).isOk():
trace "Task scheduled for peer", peer = task.id
else:
warn "Unable to schedule task for peer", peer = task.id
proc blockexcTaskRunner(self: BlockExcEngine) {.async: (raises: []).}
proc start*(self: BlockExcEngine) {.async.} =
proc start*(self: BlockExcEngine) {.async: (raises: []).} =
## Start the blockexc task
##
@ -115,7 +118,7 @@ proc start*(self: BlockExcEngine) {.async.} =
let fut = self.blockexcTaskRunner()
self.trackedFutures.track(fut)
proc stop*(self: BlockExcEngine) {.async.} =
proc stop*(self: BlockExcEngine) {.async: (raises: []).} =
## Stop the blockexc blockexc
##
@ -135,7 +138,7 @@ proc stop*(self: BlockExcEngine) {.async.} =
proc sendWantHave(
self: BlockExcEngine, addresses: seq[BlockAddress], peers: seq[BlockExcPeerCtx]
): Future[void] {.async.} =
): Future[void] {.async: (raises: [CancelledError]).} =
for p in peers:
let toAsk = addresses.filterIt(it notin p.peerHave)
trace "Sending wantHave request", toAsk, peer = p.id
@ -144,7 +147,7 @@ proc sendWantHave(
proc sendWantBlock(
self: BlockExcEngine, addresses: seq[BlockAddress], blockPeer: BlockExcPeerCtx
): Future[void] {.async.} =
): Future[void] {.async: (raises: [CancelledError]).} =
trace "Sending wantBlock request to", addresses, peer = blockPeer.id
await self.network.request.sendWantList(
blockPeer.id, addresses, wantType = WantType.WantBlock
@ -229,7 +232,7 @@ proc requestBlock*(
proc blockPresenceHandler*(
self: BlockExcEngine, peer: PeerId, blocks: seq[BlockPresence]
) {.async.} =
) {.async: (raises: []).} =
trace "Received block presence from peer", peer, blocks = blocks.mapIt($it)
let
peerCtx = self.peers.get(peer)
@ -249,20 +252,23 @@ proc blockPresenceHandler*(
if dontWantCids.len > 0:
peerCtx.cleanPresence(dontWantCids)
let ourWantCids = ourWantList.filter do(address: BlockAddress) -> bool:
if address in peerHave and not self.pendingBlocks.retriesExhausted(address) and
not self.pendingBlocks.isInFlight(address):
self.pendingBlocks.setInFlight(address, true)
self.pendingBlocks.decRetries(address)
true
else:
false
let ourWantCids = ourWantList.filterIt(
it in peerHave and not self.pendingBlocks.retriesExhausted(it) and
not self.pendingBlocks.isInFlight(it)
)
for address in ourWantCids:
self.pendingBlocks.setInFlight(address, true)
self.pendingBlocks.decRetries(address)
if ourWantCids.len > 0:
trace "Peer has blocks in our wantList", peer, wants = ourWantCids
await self.sendWantBlock(ourWantCids, peerCtx)
if err =? catch(await self.sendWantBlock(ourWantCids, peerCtx)).errorOption:
warn "Failed to send wantBlock to peer", peer, err = err.msg
proc scheduleTasks(self: BlockExcEngine, blocksDelivery: seq[BlockDelivery]) {.async.} =
proc scheduleTasks(
self: BlockExcEngine, blocksDelivery: seq[BlockDelivery]
) {.async: (raises: [CancelledError]).} =
let cids = blocksDelivery.mapIt(it.blk.cid)
# schedule any new peers to provide blocks to
@ -271,15 +277,21 @@ proc scheduleTasks(self: BlockExcEngine, blocksDelivery: seq[BlockDelivery]) {.a
# schedule a peer if it wants at least one cid
# and we have it in our local store
if c in p.peerWantsCids:
if await (c in self.localStore):
if self.scheduleTask(p):
trace "Task scheduled for peer", peer = p.id
else:
warn "Unable to schedule task for peer", peer = p.id
try:
if await (c in self.localStore):
# TODO: the try/except should go away once blockstore tracks exceptions
self.scheduleTask(p)
break
except CancelledError as exc:
warn "Checking local store canceled", cid = c, err = exc.msg
return
except CatchableError as exc:
error "Error checking local store for cid", cid = c, err = exc.msg
raiseAssert "Unexpected error checking local store for cid"
break # do next peer
proc cancelBlocks(self: BlockExcEngine, addrs: seq[BlockAddress]) {.async.} =
proc cancelBlocks(
self: BlockExcEngine, addrs: seq[BlockAddress]
) {.async: (raises: [CancelledError]).} =
## Tells neighboring peers that we're no longer interested in a block.
##
@ -289,35 +301,43 @@ proc cancelBlocks(self: BlockExcEngine, addrs: seq[BlockAddress]) {.async.} =
trace "Sending block request cancellations to peers",
addrs, peers = self.peers.peerIds
proc mapPeers(peerCtx: BlockExcPeerCtx): Future[BlockExcPeerCtx] {.async.} =
let blocks = addrs.filter do(a: BlockAddress) -> bool:
a in peerCtx.blocks
proc processPeer(peerCtx: BlockExcPeerCtx): Future[BlockExcPeerCtx] {.async.} =
await self.network.request.sendWantCancellations(
peer = peerCtx.id, addresses = addrs.filterIt(it in peerCtx)
)
if blocks.len > 0:
trace "Sending block request cancellations to peer", peer = peerCtx.id, blocks
await self.network.request.sendWantCancellations(
peer = peerCtx.id, addresses = blocks
return peerCtx
try:
let (succeededFuts, failedFuts) = await allFinishedFailed(
toSeq(self.peers.peers.values).filterIt(it.peerHave.anyIt(it in addrs)).map(
processPeer
)
)
(await allFinished(succeededFuts)).mapIt(it.read).apply do(peerCtx: BlockExcPeerCtx):
peerCtx.cleanPresence(addrs)
peerCtx
let failed = (await allFinished(map(toSeq(self.peers.peers.values), mapPeers))).filterIt(
it.failed
)
if failed.len > 0:
warn "Failed to send block request cancellations to peers", peers = failed.len
else:
trace "Block request cancellations sent to peers", peers = self.peers.len
if failedFuts.len > 0:
warn "Failed to send block request cancellations to peers", peers = failedFuts.len
else:
trace "Block request cancellations sent to peers", peers = self.peers.len
except CancelledError as exc:
warn "Error sending block request cancellations", error = exc.msg
raise exc
except CatchableError as exc:
warn "Error sending block request cancellations", error = exc.msg
proc resolveBlocks*(
self: BlockExcEngine, blocksDelivery: seq[BlockDelivery]
) {.async.} =
) {.async: (raises: [CancelledError]).} =
self.pendingBlocks.resolve(blocksDelivery)
await self.scheduleTasks(blocksDelivery)
await self.cancelBlocks(blocksDelivery.mapIt(it.address))
proc resolveBlocks*(self: BlockExcEngine, blocks: seq[Block]) {.async.} =
proc resolveBlocks*(
self: BlockExcEngine, blocks: seq[Block]
) {.async: (raises: [CancelledError]).} =
await self.resolveBlocks(
blocks.mapIt(
BlockDelivery(blk: it, address: BlockAddress(leaf: false, cid: it.cid))
@ -326,7 +346,7 @@ proc resolveBlocks*(self: BlockExcEngine, blocks: seq[Block]) {.async.} =
proc payForBlocks(
self: BlockExcEngine, peer: BlockExcPeerCtx, blocksDelivery: seq[BlockDelivery]
) {.async.} =
) {.async: (raises: [CancelledError]).} =
let
sendPayment = self.network.request.sendPayment
price = peer.price(blocksDelivery.mapIt(it.address))
@ -367,7 +387,7 @@ proc validateBlockDelivery(self: BlockExcEngine, bd: BlockDelivery): ?!void =
proc blocksDeliveryHandler*(
self: BlockExcEngine, peer: PeerId, blocksDelivery: seq[BlockDelivery]
) {.async.} =
) {.async: (raises: []).} =
trace "Received blocks from peer", peer, blocks = (blocksDelivery.mapIt(it.address))
var validatedBlocksDelivery: seq[BlockDelivery]
@ -376,41 +396,47 @@ proc blocksDeliveryHandler*(
peer = peer
address = bd.address
if err =? self.validateBlockDelivery(bd).errorOption:
warn "Block validation failed", msg = err.msg
continue
if err =? (await self.localStore.putBlock(bd.blk)).errorOption:
error "Unable to store block", err = err.msg
continue
if bd.address.leaf:
without proof =? bd.proof:
error "Proof expected for a leaf block delivery"
try:
if err =? self.validateBlockDelivery(bd).errorOption:
warn "Block validation failed", msg = err.msg
continue
if err =? (
await self.localStore.putCidAndProof(
bd.address.treeCid, bd.address.index, bd.blk.cid, proof
)
).errorOption:
error "Unable to store proof and cid for a block"
if err =? (await self.localStore.putBlock(bd.blk)).errorOption:
error "Unable to store block", err = err.msg
continue
if bd.address.leaf:
without proof =? bd.proof:
warn "Proof expected for a leaf block delivery"
continue
if err =? (
await self.localStore.putCidAndProof(
bd.address.treeCid, bd.address.index, bd.blk.cid, proof
)
).errorOption:
warn "Unable to store proof and cid for a block"
continue
except CatchableError as exc:
warn "Error handling block delivery", error = exc.msg
continue
validatedBlocksDelivery.add(bd)
await self.resolveBlocks(validatedBlocksDelivery)
codex_block_exchange_blocks_received.inc(validatedBlocksDelivery.len.int64)
let peerCtx = self.peers.get(peer)
if peerCtx != nil:
await self.payForBlocks(peerCtx, blocksDelivery)
## shouldn't we remove them from the want-list instead of this:
peerCtx.cleanPresence(blocksDelivery.mapIt(it.address))
if err =? catch(await self.payForBlocks(peerCtx, blocksDelivery)).errorOption:
warn "Error paying for blocks", err = err.msg
return
if err =? catch(await self.resolveBlocks(validatedBlocksDelivery)).errorOption:
warn "Error resolving blocks", err = err.msg
return
proc wantListHandler*(
self: BlockExcEngine, peer: PeerId, wantList: WantList
) {.async.} =
) {.async: (raises: []).} =
trace "Received want list from peer", peer, wantList = wantList.entries.len
let peerCtx = self.peers.get(peer)
@ -422,68 +448,81 @@ proc wantListHandler*(
presence: seq[BlockPresence]
schedulePeer = false
for e in wantList.entries:
let idx = peerCtx.peerWants.findIt(it.address == e.address)
try:
for e in wantList.entries:
let idx = peerCtx.peerWants.findIt(it.address == e.address)
logScope:
peer = peerCtx.id
address = e.address
wantType = $e.wantType
logScope:
peer = peerCtx.id
address = e.address
wantType = $e.wantType
if idx < 0: # Adding new entry to peer wants
let
have = await e.address in self.localStore
price = @(self.pricing.get(Pricing(price: 0.u256)).price.toBytesBE)
if idx < 0: # Adding new entry to peer wants
let
have =
try:
await e.address in self.localStore
except CatchableError as exc:
# TODO: should not be necessary once we have proper exception tracking on the BlockStore interface
false
price = @(self.pricing.get(Pricing(price: 0.u256)).price.toBytesBE)
if e.cancel:
trace "Received cancelation for untracked block, skipping", address = e.address
continue
if e.cancel:
trace "Received cancelation for untracked block, skipping",
address = e.address
continue
trace "Processing want list entry", wantList = $e
case e.wantType
of WantType.WantHave:
if have:
presence.add(
BlockPresence(
address: e.address, `type`: BlockPresenceType.Have, price: price
)
)
else:
if e.sendDontHave:
trace "Processing want list entry", wantList = $e
case e.wantType
of WantType.WantHave:
if have:
presence.add(
BlockPresence(
address: e.address, `type`: BlockPresenceType.DontHave, price: price
address: e.address, `type`: BlockPresenceType.Have, price: price
)
)
else:
if e.sendDontHave:
presence.add(
BlockPresence(
address: e.address, `type`: BlockPresenceType.DontHave, price: price
)
)
codex_block_exchange_want_have_lists_received.inc()
of WantType.WantBlock:
peerCtx.peerWants.add(e)
schedulePeer = true
codex_block_exchange_want_block_lists_received.inc()
else: # Updating existing entry in peer wants
# peer doesn't want this block anymore
if e.cancel:
trace "Canceling want for block", address = e.address
peerCtx.peerWants.del(idx)
trace "Canceled block request", address = e.address, len = peerCtx.peerWants.len
else:
if e.wantType == WantType.WantBlock:
codex_block_exchange_want_have_lists_received.inc()
of WantType.WantBlock:
peerCtx.peerWants.add(e)
schedulePeer = true
# peer might want to ask for the same cid with
# different want params
trace "Updating want for block", address = e.address
peerCtx.peerWants[idx] = e # update entry
trace "Updated block request", address = e.address, len = peerCtx.peerWants.len
codex_block_exchange_want_block_lists_received.inc()
else: # Updating existing entry in peer wants
# peer doesn't want this block anymore
if e.cancel:
trace "Canceling want for block", address = e.address
peerCtx.peerWants.del(idx)
trace "Canceled block request",
address = e.address, len = peerCtx.peerWants.len
else:
if e.wantType == WantType.WantBlock:
schedulePeer = true
# peer might want to ask for the same cid with
# different want params
trace "Updating want for block", address = e.address
peerCtx.peerWants[idx] = e # update entry
trace "Updated block request",
address = e.address, len = peerCtx.peerWants.len
if presence.len > 0:
trace "Sending presence to remote", items = presence.mapIt($it).join(",")
await self.network.request.sendPresence(peer, presence)
if presence.len > 0:
trace "Sending presence to remote", items = presence.mapIt($it).join(",")
await self.network.request.sendPresence(peer, presence)
if schedulePeer and not self.scheduleTask(peerCtx):
warn "Unable to schedule task for peer", peer
if schedulePeer:
self.scheduleTask(peerCtx)
except CancelledError as exc: #TODO: replace with CancelledError
warn "Error processing want list", error = exc.msg
proc accountHandler*(self: BlockExcEngine, peer: PeerId, account: Account) {.async.} =
proc accountHandler*(
self: BlockExcEngine, peer: PeerId, account: Account
) {.async: (raises: []).} =
let context = self.peers.get(peer)
if context.isNil:
return
@ -492,7 +531,7 @@ proc accountHandler*(self: BlockExcEngine, peer: PeerId, account: Account) {.asy
proc paymentHandler*(
self: BlockExcEngine, peer: PeerId, payment: SignedState
) {.async.} =
) {.async: (raises: []).} =
trace "Handling payments", peer
without context =? self.peers.get(peer).option and account =? context.account:
@ -505,7 +544,9 @@ proc paymentHandler*(
else:
context.paymentChannel = self.wallet.acceptChannel(payment).option
proc setupPeer*(self: BlockExcEngine, peer: PeerId) {.async.} =
proc setupPeer*(
self: BlockExcEngine, peer: PeerId
) {.async: (raises: [CancelledError]).} =
## Perform initial setup, such as want
## list exchange
##
@ -524,9 +565,10 @@ proc setupPeer*(self: BlockExcEngine, peer: PeerId) {.async.} =
await self.network.request.sendWantList(peer, cids, full = true)
if address =? self.pricing .? address:
trace "Sending account to peer", peer
await self.network.request.sendAccount(peer, Account(address: address))
proc dropPeer*(self: BlockExcEngine, peer: PeerId) =
proc dropPeer*(self: BlockExcEngine, peer: PeerId) {.raises: [].} =
## Cleanup disconnected peer
##
@ -535,7 +577,9 @@ proc dropPeer*(self: BlockExcEngine, peer: PeerId) =
# drop the peer from the peers table
self.peers.remove(peer)
proc taskHandler*(self: BlockExcEngine, task: BlockExcPeerCtx) {.gcsafe, async.} =
proc taskHandler*(
self: BlockExcEngine, task: BlockExcPeerCtx
) {.gcsafe, async: (raises: [CancelledError, RetriesExhaustedError]).} =
# Send to the peer blocks he wants to get,
# if they present in our local store
@ -572,8 +616,11 @@ proc taskHandler*(self: BlockExcEngine, task: BlockExcPeerCtx) {.gcsafe, async.}
let
blocksDeliveryFut = await allFinished(wantsBlocks.map(localLookup))
blocksDelivery =
blocksDeliveryFut.filterIt(it.completed and it.read.isOk).mapIt(it.read.get)
blocksDelivery = blocksDeliveryFut.filterIt(it.completed and it.value.isOk).mapIt:
if bd =? it.value:
bd
else:
raiseAssert "Unexpected error in local lookup"
# All the wants that failed local lookup must be set to not-in-flight again.
let
@ -595,15 +642,12 @@ proc blockexcTaskRunner(self: BlockExcEngine) {.async: (raises: []).} =
##
trace "Starting blockexc task runner"
while self.blockexcRunning:
try:
try:
while self.blockexcRunning:
let peerCtx = await self.taskQueue.pop()
await self.taskHandler(peerCtx)
except CancelledError:
break # do not propagate as blockexcTaskRunner was asyncSpawned
except CatchableError as e:
error "error running block exchange task", error = e.msgDetail
except CatchableError as exc:
error "error running block exchange task", error = exc.msg
info "Exiting blockexc task runner"
@ -644,23 +688,29 @@ proc new*(
network.switch.addPeerEventHandler(peerEventHandler, PeerEventKind.Joined)
network.switch.addPeerEventHandler(peerEventHandler, PeerEventKind.Left)
proc blockWantListHandler(peer: PeerId, wantList: WantList): Future[void] {.gcsafe.} =
proc blockWantListHandler(
peer: PeerId, wantList: WantList
): Future[void] {.async: (raises: []).} =
self.wantListHandler(peer, wantList)
proc blockPresenceHandler(
peer: PeerId, presence: seq[BlockPresence]
): Future[void] {.gcsafe.} =
): Future[void] {.async: (raises: []).} =
self.blockPresenceHandler(peer, presence)
proc blocksDeliveryHandler(
peer: PeerId, blocksDelivery: seq[BlockDelivery]
): Future[void] {.gcsafe.} =
): Future[void] {.async: (raises: []).} =
self.blocksDeliveryHandler(peer, blocksDelivery)
proc accountHandler(peer: PeerId, account: Account): Future[void] {.gcsafe.} =
proc accountHandler(
peer: PeerId, account: Account
): Future[void] {.async: (raises: []).} =
self.accountHandler(peer, account)
proc paymentHandler(peer: PeerId, payment: SignedState): Future[void] {.gcsafe.} =
proc paymentHandler(
peer: PeerId, payment: SignedState
): Future[void] {.async: (raises: []).} =
self.paymentHandler(peer, payment)
network.handlers = BlockExcHandlers(

View File

@ -7,6 +7,8 @@
## This file may not be copied, modified, or distributed except according to
## those terms.
{.push raises: [].}
import std/math
import pkg/nitro
import pkg/questionable/results
@ -15,9 +17,6 @@ import ../peers
export nitro
export results
push:
{.upraises: [].}
const ChainId* = 0.u256 # invalid chain id for now
const Asset* = EthAddress.zero # invalid ERC20 asset address for now
const AmountPerChannel = (10'u64 ^ 18).u256 # 1 asset, ERC20 default is 18 decimals

View File

@ -35,13 +35,15 @@ const
DefaultMaxInflight* = 100
type
WantListHandler* = proc(peer: PeerId, wantList: WantList): Future[void] {.gcsafe.}
WantListHandler* =
proc(peer: PeerId, wantList: WantList) {.gcsafe, async: (raises: []).}
BlocksDeliveryHandler* =
proc(peer: PeerId, blocks: seq[BlockDelivery]): Future[void] {.gcsafe.}
proc(peer: PeerId, blocks: seq[BlockDelivery]) {.gcsafe, async: (raises: []).}
BlockPresenceHandler* =
proc(peer: PeerId, precense: seq[BlockPresence]): Future[void] {.gcsafe.}
AccountHandler* = proc(peer: PeerId, account: Account): Future[void] {.gcsafe.}
PaymentHandler* = proc(peer: PeerId, payment: SignedState): Future[void] {.gcsafe.}
proc(peer: PeerId, precense: seq[BlockPresence]) {.gcsafe, async: (raises: []).}
AccountHandler* = proc(peer: PeerId, account: Account) {.gcsafe, async: (raises: []).}
PaymentHandler* =
proc(peer: PeerId, payment: SignedState) {.gcsafe, async: (raises: []).}
BlockExcHandlers* = object
onWantList*: WantListHandler
@ -58,15 +60,20 @@ type
wantType: WantType = WantType.WantHave,
full: bool = false,
sendDontHave: bool = false,
): Future[void] {.gcsafe.}
WantCancellationSender* =
proc(peer: PeerId, addresses: seq[BlockAddress]): Future[void] {.gcsafe.}
BlocksDeliverySender* =
proc(peer: PeerId, blocksDelivery: seq[BlockDelivery]): Future[void] {.gcsafe.}
PresenceSender* =
proc(peer: PeerId, presence: seq[BlockPresence]): Future[void] {.gcsafe.}
AccountSender* = proc(peer: PeerId, account: Account): Future[void] {.gcsafe.}
PaymentSender* = proc(peer: PeerId, payment: SignedState): Future[void] {.gcsafe.}
) {.async: (raises: [CancelledError]).}
WantCancellationSender* = proc(peer: PeerId, addresses: seq[BlockAddress]) {.
async: (raises: [CancelledError])
.}
BlocksDeliverySender* = proc(peer: PeerId, blocksDelivery: seq[BlockDelivery]) {.
async: (raises: [CancelledError])
.}
PresenceSender* = proc(peer: PeerId, presence: seq[BlockPresence]) {.
async: (raises: [CancelledError])
.}
AccountSender* =
proc(peer: PeerId, account: Account) {.async: (raises: [CancelledError]).}
PaymentSender* =
proc(peer: PeerId, payment: SignedState) {.async: (raises: [CancelledError]).}
BlockExcRequest* = object
sendWantList*: WantListSender
@ -98,7 +105,9 @@ proc isSelf*(b: BlockExcNetwork, peer: PeerId): bool =
return b.peerId == peer
proc send*(b: BlockExcNetwork, id: PeerId, msg: pb.Message) {.async.} =
proc send*(
b: BlockExcNetwork, id: PeerId, msg: pb.Message
) {.async: (raises: [CancelledError]).} =
## Send message to peer
##
@ -106,8 +115,9 @@ proc send*(b: BlockExcNetwork, id: PeerId, msg: pb.Message) {.async.} =
trace "Unable to send, peer not found", peerId = id
return
let peer = b.peers[id]
try:
let peer = b.peers[id]
await b.inflightSema.acquire()
await peer.send(msg)
except CancelledError as error:
@ -117,7 +127,9 @@ proc send*(b: BlockExcNetwork, id: PeerId, msg: pb.Message) {.async.} =
finally:
b.inflightSema.release()
proc handleWantList(b: BlockExcNetwork, peer: NetworkPeer, list: WantList) {.async.} =
proc handleWantList(
b: BlockExcNetwork, peer: NetworkPeer, list: WantList
) {.async: (raises: []).} =
## Handle incoming want list
##
@ -133,7 +145,7 @@ proc sendWantList*(
wantType: WantType = WantType.WantHave,
full: bool = false,
sendDontHave: bool = false,
): Future[void] =
) {.async: (raw: true, raises: [CancelledError]).} =
## Send a want message to peer
##
@ -154,14 +166,14 @@ proc sendWantList*(
proc sendWantCancellations*(
b: BlockExcNetwork, id: PeerId, addresses: seq[BlockAddress]
): Future[void] {.async.} =
): Future[void] {.async: (raises: [CancelledError]).} =
## Informs a remote peer that we're no longer interested in a set of blocks
##
await b.sendWantList(id = id, addresses = addresses, cancel = true)
proc handleBlocksDelivery(
b: BlockExcNetwork, peer: NetworkPeer, blocksDelivery: seq[BlockDelivery]
) {.async.} =
) {.async: (raises: []).} =
## Handle incoming blocks
##
@ -170,7 +182,7 @@ proc handleBlocksDelivery(
proc sendBlocksDelivery*(
b: BlockExcNetwork, id: PeerId, blocksDelivery: seq[BlockDelivery]
): Future[void] =
) {.async: (raw: true, raises: [CancelledError]).} =
## Send blocks to remote
##
@ -178,7 +190,7 @@ proc sendBlocksDelivery*(
proc handleBlockPresence(
b: BlockExcNetwork, peer: NetworkPeer, presence: seq[BlockPresence]
) {.async.} =
) {.async: (raises: []).} =
## Handle block presence
##
@ -187,7 +199,7 @@ proc handleBlockPresence(
proc sendBlockPresence*(
b: BlockExcNetwork, id: PeerId, presence: seq[BlockPresence]
): Future[void] =
) {.async: (raw: true, raises: [CancelledError]).} =
## Send presence to remote
##
@ -195,20 +207,24 @@ proc sendBlockPresence*(
proc handleAccount(
network: BlockExcNetwork, peer: NetworkPeer, account: Account
) {.async.} =
) {.async: (raises: []).} =
## Handle account info
##
if not network.handlers.onAccount.isNil:
await network.handlers.onAccount(peer.id, account)
proc sendAccount*(b: BlockExcNetwork, id: PeerId, account: Account): Future[void] =
proc sendAccount*(
b: BlockExcNetwork, id: PeerId, account: Account
) {.async: (raw: true, raises: [CancelledError]).} =
## Send account info to remote
##
b.send(id, Message(account: AccountMessage.init(account)))
proc sendPayment*(b: BlockExcNetwork, id: PeerId, payment: SignedState): Future[void] =
proc sendPayment*(
b: BlockExcNetwork, id: PeerId, payment: SignedState
) {.async: (raw: true, raises: [CancelledError]).} =
## Send payment to remote
##
@ -216,7 +232,7 @@ proc sendPayment*(b: BlockExcNetwork, id: PeerId, payment: SignedState): Future[
proc handlePayment(
network: BlockExcNetwork, peer: NetworkPeer, payment: SignedState
) {.async.} =
) {.async: (raises: []).} =
## Handle payment
##
@ -225,7 +241,7 @@ proc handlePayment(
proc rpcHandler(
b: BlockExcNetwork, peer: NetworkPeer, msg: Message
) {.async: (raises: [CatchableError]).} =
) {.async: (raises: []).} =
## handle rpc messages
##
if msg.wantList.entries.len > 0:
@ -250,7 +266,9 @@ proc getOrCreatePeer(b: BlockExcNetwork, peer: PeerId): NetworkPeer =
if peer in b.peers:
return b.peers.getOrDefault(peer, nil)
var getConn: ConnProvider = proc(): Future[Connection] {.async, gcsafe, closure.} =
var getConn: ConnProvider = proc(): Future[Connection] {.
async: (raises: [CancelledError])
.} =
try:
trace "Getting new connection stream", peer
return await b.switch.dial(peer, Codec)
@ -262,9 +280,7 @@ proc getOrCreatePeer(b: BlockExcNetwork, peer: PeerId): NetworkPeer =
if not isNil(b.getConn):
getConn = b.getConn
let rpcHandler = proc(
p: NetworkPeer, msg: Message
) {.async: (raises: [CatchableError]).} =
let rpcHandler = proc(p: NetworkPeer, msg: Message) {.async: (raises: []).} =
await b.rpcHandler(p, msg)
# create new pubsub peer
@ -353,26 +369,32 @@ proc new*(
wantType: WantType = WantType.WantHave,
full: bool = false,
sendDontHave: bool = false,
): Future[void] {.gcsafe.} =
): Future[void] {.async: (raw: true, raises: [CancelledError]).} =
self.sendWantList(id, cids, priority, cancel, wantType, full, sendDontHave)
proc sendWantCancellations(
id: PeerId, addresses: seq[BlockAddress]
): Future[void] {.gcsafe.} =
): Future[void] {.async: (raw: true, raises: [CancelledError]).} =
self.sendWantCancellations(id, addresses)
proc sendBlocksDelivery(
id: PeerId, blocksDelivery: seq[BlockDelivery]
): Future[void] {.gcsafe.} =
): Future[void] {.async: (raw: true, raises: [CancelledError]).} =
self.sendBlocksDelivery(id, blocksDelivery)
proc sendPresence(id: PeerId, presence: seq[BlockPresence]): Future[void] {.gcsafe.} =
proc sendPresence(
id: PeerId, presence: seq[BlockPresence]
): Future[void] {.async: (raw: true, raises: [CancelledError]).} =
self.sendBlockPresence(id, presence)
proc sendAccount(id: PeerId, account: Account): Future[void] {.gcsafe.} =
proc sendAccount(
id: PeerId, account: Account
): Future[void] {.async: (raw: true, raises: [CancelledError]).} =
self.sendAccount(id, account)
proc sendPayment(id: PeerId, payment: SignedState): Future[void] {.gcsafe.} =
proc sendPayment(
id: PeerId, payment: SignedState
): Future[void] {.async: (raw: true, raises: [CancelledError]).} =
self.sendPayment(id, payment)
self.request = BlockExcRequest(

View File

@ -7,9 +7,7 @@
## This file may not be copied, modified, or distributed except according to
## those terms.
import pkg/upraises
push:
{.upraises: [].}
{.push raises: [].}
import pkg/chronos
import pkg/libp2p
@ -18,6 +16,7 @@ import ../protobuf/blockexc
import ../protobuf/message
import ../../errors
import ../../logutils
import ../../utils/trackedfutures
logScope:
topics = "codex blockexcnetworkpeer"
@ -25,11 +24,10 @@ logScope:
const DefaultYieldInterval = 50.millis
type
ConnProvider* = proc(): Future[Connection] {.gcsafe, closure.}
ConnProvider* =
proc(): Future[Connection] {.gcsafe, async: (raises: [CancelledError]).}
RPCHandler* = proc(
peer: NetworkPeer, msg: Message
): Future[void].Raising(CatchableError) {.gcsafe.}
RPCHandler* = proc(peer: NetworkPeer, msg: Message) {.gcsafe, async: (raises: []).}
NetworkPeer* = ref object of RootObj
id*: PeerId
@ -37,55 +35,60 @@ type
sendConn: Connection
getConn: ConnProvider
yieldInterval*: Duration = DefaultYieldInterval
trackedFutures: TrackedFutures
proc connected*(b: NetworkPeer): bool =
not (isNil(b.sendConn)) and not (b.sendConn.closed or b.sendConn.atEof)
proc connected*(self: NetworkPeer): bool =
not (isNil(self.sendConn)) and not (self.sendConn.closed or self.sendConn.atEof)
proc readLoop*(b: NetworkPeer, conn: Connection) {.async.} =
proc readLoop*(self: NetworkPeer, conn: Connection) {.async: (raises: []).} =
if isNil(conn):
trace "No connection to read from", peer = b.id
trace "No connection to read from", peer = self.id
return
trace "Attaching read loop", peer = b.id, connId = conn.oid
trace "Attaching read loop", peer = self.id, connId = conn.oid
try:
var nextYield = Moment.now() + b.yieldInterval
var nextYield = Moment.now() + self.yieldInterval
while not conn.atEof or not conn.closed:
if Moment.now() > nextYield:
nextYield = Moment.now() + b.yieldInterval
nextYield = Moment.now() + self.yieldInterval
trace "Yielding in read loop",
peer = b.id, nextYield = nextYield, interval = b.yieldInterval
peer = self.id, nextYield = nextYield, interval = self.yieldInterval
await sleepAsync(10.millis)
let
data = await conn.readLp(MaxMessageSize.int)
msg = Message.protobufDecode(data).mapFailure().tryGet()
trace "Received message", peer = b.id, connId = conn.oid
await b.handler(b, msg)
trace "Received message", peer = self.id, connId = conn.oid
await self.handler(self, msg)
except CancelledError:
trace "Read loop cancelled"
except CatchableError as err:
warn "Exception in blockexc read loop", msg = err.msg
finally:
trace "Detaching read loop", peer = b.id, connId = conn.oid
trace "Detaching read loop", peer = self.id, connId = conn.oid
await conn.close()
proc connect*(b: NetworkPeer): Future[Connection] {.async.} =
if b.connected:
trace "Already connected", peer = b.id, connId = b.sendConn.oid
return b.sendConn
proc connect*(
self: NetworkPeer
): Future[Connection] {.async: (raises: [CancelledError]).} =
if self.connected:
trace "Already connected", peer = self.id, connId = self.sendConn.oid
return self.sendConn
b.sendConn = await b.getConn()
asyncSpawn b.readLoop(b.sendConn)
return b.sendConn
self.sendConn = await self.getConn()
self.trackedFutures.track(self.readLoop(self.sendConn))
return self.sendConn
proc send*(b: NetworkPeer, msg: Message) {.async.} =
let conn = await b.connect()
proc send*(
self: NetworkPeer, msg: Message
) {.async: (raises: [CancelledError, LPStreamError]).} =
let conn = await self.connect()
if isNil(conn):
warn "Unable to get send connection for peer message not sent", peer = b.id
warn "Unable to get send connection for peer message not sent", peer = self.id
return
trace "Sending message", peer = b.id, connId = conn.oid
trace "Sending message", peer = self.id, connId = conn.oid
await conn.writeLp(protobufEncode(msg))
func new*(
@ -96,4 +99,9 @@ func new*(
): NetworkPeer =
doAssert(not isNil(connProvider), "should supply connection provider")
NetworkPeer(id: peer, getConn: connProvider, handler: rpcHandler)
NetworkPeer(
id: peer,
getConn: connProvider,
handler: rpcHandler,
trackedFutures: TrackedFutures(),
)

View File

@ -7,16 +7,13 @@
## This file may not be copied, modified, or distributed except according to
## those terms.
{.push raises: [].}
import std/sequtils
import std/tables
import std/algorithm
import std/sequtils
import pkg/upraises
push:
{.upraises: [].}
import pkg/chronos
import pkg/libp2p

View File

@ -1,8 +1,9 @@
{.push raises: [].}
import pkg/stew/byteutils
import pkg/stint
import pkg/nitro
import pkg/questionable
import pkg/upraises
import ./blockexc
export AccountMessage
@ -11,9 +12,6 @@ export StateChannelUpdate
export stint
export nitro
push:
{.upraises: [].}
type Account* = object
address*: EthAddress

View File

@ -1,8 +1,9 @@
{.push raises: [].}
import libp2p
import pkg/stint
import pkg/questionable
import pkg/questionable/results
import pkg/upraises
import ./blockexc
import ../../blocktype
@ -11,9 +12,6 @@ export questionable
export stint
export BlockPresenceType
upraises.push:
{.upraises: [].}
type
PresenceMessage* = blockexc.BlockPresence
Presence* = object

View File

@ -28,8 +28,11 @@ const DefaultChunkSize* = DefaultBlockSize
type
# default reader type
ChunkerError* = object of CatchableError
ChunkBuffer* = ptr UncheckedArray[byte]
Reader* = proc(data: ChunkBuffer, len: int): Future[int] {.gcsafe, raises: [Defect].}
Reader* = proc(data: ChunkBuffer, len: int): Future[int] {.
gcsafe, async: (raises: [ChunkerError, CancelledError])
.}
# Reader that splits input data into fixed-size chunks
Chunker* = ref object
@ -74,7 +77,7 @@ proc new*(
proc reader(
data: ChunkBuffer, len: int
): Future[int] {.gcsafe, async, raises: [Defect].} =
): Future[int] {.gcsafe, async: (raises: [ChunkerError, CancelledError]).} =
var res = 0
try:
while res < len:
@ -85,7 +88,7 @@ proc new*(
raise error
except LPStreamError as error:
error "LPStream error", err = error.msg
raise error
raise newException(ChunkerError, "LPStream error", error)
except CatchableError as exc:
error "CatchableError exception", exc = exc.msg
raise newException(Defect, exc.msg)
@ -102,7 +105,7 @@ proc new*(
proc reader(
data: ChunkBuffer, len: int
): Future[int] {.gcsafe, async, raises: [Defect].} =
): Future[int] {.gcsafe, async: (raises: [ChunkerError, CancelledError]).} =
var total = 0
try:
while total < len:

View File

@ -177,14 +177,20 @@ proc start*(s: CodexServer) {.async.} =
proc stop*(s: CodexServer) {.async.} =
notice "Stopping codex node"
await allFuturesThrowing(
s.restServer.stop(),
s.codexNode.switch.stop(),
s.codexNode.stop(),
s.repoStore.stop(),
s.maintenance.stop(),
let res = await noCancel allFinishedFailed(
@[
s.restServer.stop(),
s.codexNode.switch.stop(),
s.codexNode.stop(),
s.repoStore.stop(),
s.maintenance.stop(),
]
)
if res.failure.len > 0:
error "Failed to stop codex node", failures = res.failure.len
raiseAssert "Failed to stop codex node"
proc new*(
T: type CodexServer, config: CodexConf, privateKey: CodexPrivateKey
): CodexServer =

View File

@ -5,6 +5,7 @@ import pkg/chronos
import pkg/stint
import ../clock
import ../conf
import ../utils/trackedfutures
export clock
@ -18,9 +19,12 @@ type OnChainClock* = ref object of Clock
blockNumber: UInt256
started: bool
newBlock: AsyncEvent
trackedFutures: TrackedFutures
proc new*(_: type OnChainClock, provider: Provider): OnChainClock =
OnChainClock(provider: provider, newBlock: newAsyncEvent())
OnChainClock(
provider: provider, newBlock: newAsyncEvent(), trackedFutures: TrackedFutures()
)
proc update(clock: OnChainClock, blck: Block) =
if number =? blck.number and number > clock.blockNumber:
@ -32,15 +36,12 @@ proc update(clock: OnChainClock, blck: Block) =
blockTime = blck.timestamp, blockNumber = number, offset = clock.offset
clock.newBlock.fire()
proc update(clock: OnChainClock) {.async.} =
proc update(clock: OnChainClock) {.async: (raises: []).} =
try:
if latest =? (await clock.provider.getBlock(BlockTag.latest)):
clock.update(latest)
except CancelledError as error:
raise error
except CatchableError as error:
debug "error updating clock: ", error = error.msg
discard
method start*(clock: OnChainClock) {.async.} =
if clock.started:
@ -52,7 +53,7 @@ method start*(clock: OnChainClock) {.async.} =
return
# ignore block parameter; hardhat may call this with pending blocks
asyncSpawn clock.update()
clock.trackedFutures.track(clock.update())
await clock.update()
@ -64,6 +65,7 @@ method stop*(clock: OnChainClock) {.async.} =
return
await clock.subscription.unsubscribe()
await clock.trackedFutures.cancelTracked()
clock.started = false
method now*(clock: OnChainClock): SecondsSince1970 =

View File

@ -7,6 +7,8 @@
## This file may not be copied, modified, or distributed except according to
## those terms.
{.push raises: [].}
import std/algorithm
import std/sequtils
@ -54,70 +56,122 @@ proc toNodeId*(host: ca.Address): NodeId =
readUintBE[256](keccak256.digest(host.toArray).data)
proc findPeer*(d: Discovery, peerId: PeerId): Future[?PeerRecord] {.async.} =
proc findPeer*(
d: Discovery, peerId: PeerId
): Future[?PeerRecord] {.async: (raises: [CancelledError]).} =
trace "protocol.resolve..."
## Find peer using the given Discovery object
##
let node = await d.protocol.resolve(toNodeId(peerId))
return
if node.isSome():
node.get().record.data.some
else:
PeerRecord.none
try:
let node = await d.protocol.resolve(toNodeId(peerId))
method find*(d: Discovery, cid: Cid): Future[seq[SignedPeerRecord]] {.async, base.} =
return
if node.isSome():
node.get().record.data.some
else:
PeerRecord.none
except CancelledError as exc:
warn "Error finding peer", peerId = peerId, exc = exc.msg
raise exc
except CatchableError as exc:
warn "Error finding peer", peerId = peerId, exc = exc.msg
return PeerRecord.none
method find*(
d: Discovery, cid: Cid
): Future[seq[SignedPeerRecord]] {.async: (raises: [CancelledError]), base.} =
## Find block providers
##
without providers =? (await d.protocol.getProviders(cid.toNodeId())).mapFailure, error:
warn "Error finding providers for block", cid, error = error.msg
return providers.filterIt(not (it.data.peerId == d.peerId))
try:
without providers =? (await d.protocol.getProviders(cid.toNodeId())).mapFailure,
error:
warn "Error finding providers for block", cid, error = error.msg
method provide*(d: Discovery, cid: Cid) {.async, base.} =
return providers.filterIt(not (it.data.peerId == d.peerId))
except CancelledError as exc:
warn "Error finding providers for block", cid, exc = exc.msg
raise exc
except CatchableError as exc:
warn "Error finding providers for block", cid, exc = exc.msg
method provide*(d: Discovery, cid: Cid) {.async: (raises: [CancelledError]), base.} =
## Provide a block Cid
##
let nodes = await d.protocol.addProvider(cid.toNodeId(), d.providerRecord.get)
try:
let nodes = await d.protocol.addProvider(cid.toNodeId(), d.providerRecord.get)
if nodes.len <= 0:
warn "Couldn't provide to any nodes!"
if nodes.len <= 0:
warn "Couldn't provide to any nodes!"
except CancelledError as exc:
warn "Error providing block", cid, exc = exc.msg
raise exc
except CatchableError as exc:
warn "Error providing block", cid, exc = exc.msg
method find*(
d: Discovery, host: ca.Address
): Future[seq[SignedPeerRecord]] {.async, base.} =
): Future[seq[SignedPeerRecord]] {.async: (raises: [CancelledError]), base.} =
## Find host providers
##
trace "Finding providers for host", host = $host
without var providers =? (await d.protocol.getProviders(host.toNodeId())).mapFailure,
error:
trace "Error finding providers for host", host = $host, exc = error.msg
return
try:
trace "Finding providers for host", host = $host
without var providers =? (await d.protocol.getProviders(host.toNodeId())).mapFailure,
error:
trace "Error finding providers for host", host = $host, exc = error.msg
return
if providers.len <= 0:
trace "No providers found", host = $host
return
if providers.len <= 0:
trace "No providers found", host = $host
return
providers.sort do(a, b: SignedPeerRecord) -> int:
system.cmp[uint64](a.data.seqNo, b.data.seqNo)
providers.sort do(a, b: SignedPeerRecord) -> int:
system.cmp[uint64](a.data.seqNo, b.data.seqNo)
return providers
return providers
except CancelledError as exc:
warn "Error finding providers for host", host = $host, exc = exc.msg
raise exc
except CatchableError as exc:
warn "Error finding providers for host", host = $host, exc = exc.msg
method provide*(d: Discovery, host: ca.Address) {.async, base.} =
method provide*(
d: Discovery, host: ca.Address
) {.async: (raises: [CancelledError]), base.} =
## Provide hosts
##
trace "Providing host", host = $host
let nodes = await d.protocol.addProvider(host.toNodeId(), d.providerRecord.get)
if nodes.len > 0:
trace "Provided to nodes", nodes = nodes.len
try:
trace "Providing host", host = $host
let nodes = await d.protocol.addProvider(host.toNodeId(), d.providerRecord.get)
if nodes.len > 0:
trace "Provided to nodes", nodes = nodes.len
except CancelledError as exc:
warn "Error providing host", host = $host, exc = exc.msg
raise exc
except CatchableError as exc:
warn "Error providing host", host = $host, exc = exc.msg
method removeProvider*(d: Discovery, peerId: PeerId): Future[void] {.base, gcsafe.} =
method removeProvider*(
d: Discovery, peerId: PeerId
): Future[void] {.base, gcsafe, async: (raises: [CancelledError]).} =
## Remove provider from providers table
##
trace "Removing provider", peerId
d.protocol.removeProvidersLocal(peerId)
try:
await d.protocol.removeProvidersLocal(peerId)
except CancelledError as exc:
warn "Error removing provider", peerId = peerId, exc = exc.msg
raise exc
except CatchableError as exc:
warn "Error removing provider", peerId = peerId, exc = exc.msg
except Exception as exc: # Something in discv5 is raising Exception
warn "Error removing provider", peerId = peerId, exc = exc.msg
raiseAssert("Unexpected Exception in removeProvider")
proc updateAnnounceRecord*(d: Discovery, addrs: openArray[MultiAddress]) =
## Update providers record
@ -145,12 +199,18 @@ proc updateDhtRecord*(d: Discovery, addrs: openArray[MultiAddress]) =
if not d.protocol.isNil:
d.protocol.updateRecord(d.dhtRecord).expect("Should update SPR")
proc start*(d: Discovery) {.async.} =
d.protocol.open()
await d.protocol.start()
proc start*(d: Discovery) {.async: (raises: []).} =
try:
d.protocol.open()
await d.protocol.start()
except CatchableError as exc:
error "Error starting discovery", exc = exc.msg
proc stop*(d: Discovery) {.async.} =
await d.protocol.closeWait()
proc stop*(d: Discovery) {.async: (raises: []).} =
try:
await noCancel d.protocol.closeWait()
except CatchableError as exc:
error "Error stopping discovery", exc = exc.msg
proc new*(
T: type Discovery,

View File

@ -330,7 +330,7 @@ proc encodeAsync*(
defer:
freeDoubleArray(blockData, blocksLen)
## Create an ecode task with block data
## Create an ecode task with block data
var task = EncodeTask(
erasure: addr self,
blockSize: blockSize,
@ -540,7 +540,7 @@ proc decodeAsync*(
freeDoubleArray(blocksData, blocksLen)
freeDoubleArray(parityData, parityLen)
## Create an decode task with block data
## Create an decode task with block data
var task = DecodeTask(
erasure: addr self,
blockSize: blockSize,

View File

@ -19,6 +19,8 @@ type
CodexError* = object of CatchableError # base codex error
CodexResult*[T] = Result[T, ref CodexError]
FinishedFailed*[T] = tuple[success: seq[Future[T]], failure: seq[Future[T]]]
template mapFailure*[T, V, E](
exp: Result[T, V], exc: typedesc[E]
): Result[T, ref CatchableError] =
@ -40,35 +42,18 @@ func toFailure*[T](exp: Option[T]): Result[T, ref CatchableError] {.inline.} =
else:
T.failure("Option is None")
# allFuturesThrowing was moved to the tests in libp2p
proc allFuturesThrowing*[T](args: varargs[Future[T]]): Future[void] =
var futs: seq[Future[T]]
for fut in args:
futs &= fut
proc call() {.async.} =
var first: ref CatchableError = nil
futs = await allFinished(futs)
for fut in futs:
if fut.failed:
let err = fut.readError()
if err of Defect:
raise err
else:
if err of CancelledError:
raise err
if isNil(first):
first = err
if not isNil(first):
raise first
proc allFinishedFailed*[T](futs: seq[Future[T]]): Future[FinishedFailed[T]] {.async.} =
## Check if all futures have finished or failed
##
## TODO: wip, not sure if we want this - at the minimum,
## we should probably avoid the async transform
return call()
var res: FinishedFailed[T] = (@[], @[])
await allFutures(futs)
for f in futs:
if f.failed:
res.failure.add f
else:
res.success.add f
proc allFutureResult*[T](fut: seq[Future[T]]): Future[?!void] {.async.} =
try:
await allFuturesThrowing(fut)
except CancelledError as exc:
raise exc
except CatchableError as exc:
return failure(exc.msg)
return success()
return res

View File

@ -153,7 +153,11 @@ proc updateExpiry*(
let ensuringFutures = Iter[int].new(0 ..< manifest.blocksCount).mapIt(
self.networkStore.localStore.ensureExpiry(manifest.treeCid, it, expiry)
)
await allFuturesThrowing(ensuringFutures)
let res = await allFinishedFailed(ensuringFutures)
if res.failure.len > 0:
trace "Some blocks failed to update expiry", len = res.failure.len
return failure("Some blocks failed to update expiry (" & $res.failure.len & " )")
except CancelledError as exc:
raise exc
except CatchableError as exc:
@ -186,8 +190,10 @@ proc fetchBatched*(
if not (await address in self.networkStore) or fetchLocal:
self.networkStore.getBlock(address)
if blocksErr =? (await allFutureResult(blocks)).errorOption:
return failure(blocksErr)
let res = await allFinishedFailed(blocks)
if res.failure.len > 0:
trace "Some blocks failed to fetch", len = res.failure.len
return failure("Some blocks failed to fetch (" & $res.failure.len & " )")
if not onBatch.isNil and
batchErr =? (await onBatch(blocks.mapIt(it.read.get))).errorOption:
@ -213,6 +219,30 @@ proc fetchBatched*(
let iter = Iter[int].new(0 ..< manifest.blocksCount)
self.fetchBatched(manifest.treeCid, iter, batchSize, onBatch, fetchLocal)
proc fetchDatasetAsync*(
self: CodexNodeRef, manifest: Manifest, fetchLocal = true
): Future[void] {.async: (raises: []).} =
## Asynchronously fetch a dataset in the background.
## This task will be tracked and cleaned up on node shutdown.
##
try:
if err =? (
await self.fetchBatched(
manifest = manifest, batchSize = DefaultFetchBatch, fetchLocal = fetchLocal
)
).errorOption:
error "Unable to fetch blocks", err = err.msg
except CancelledError as exc:
trace "Cancelled fetching blocks", exc = exc.msg
except CatchableError as exc:
error "Error fetching blocks", exc = exc.msg
proc fetchDatasetAsyncTask*(self: CodexNodeRef, manifest: Manifest) =
## Start fetching a dataset in the background.
## The task will be tracked and cleaned up on node shutdown.
##
self.trackedFutures.track(self.fetchDatasetAsync(manifest, fetchLocal = false))
proc streamSingleBlock(self: CodexNodeRef, cid: Cid): Future[?!LPStream] {.async.} =
## Streams the contents of a single block.
##
@ -223,36 +253,27 @@ proc streamSingleBlock(self: CodexNodeRef, cid: Cid): Future[?!LPStream] {.async
without blk =? (await self.networkStore.getBlock(BlockAddress.init(cid))), err:
return failure(err)
proc streamOneBlock(): Future[void] {.async.} =
proc streamOneBlock(): Future[void] {.async: (raises: []).} =
try:
defer:
await stream.pushEof()
await stream.pushData(blk.data)
except CatchableError as exc:
trace "Unable to send block", cid, exc = exc.msg
discard
finally:
await stream.pushEof()
self.trackedFutures.track(streamOneBlock())
LPStream(stream).success
proc streamEntireDataset(
self: CodexNodeRef,
manifest: Manifest,
manifestCid: Cid,
prefetchBatch = DefaultFetchBatch,
self: CodexNodeRef, manifest: Manifest, manifestCid: Cid
): Future[?!LPStream] {.async.} =
## Streams the contents of the entire dataset described by the manifest.
## Background jobs (erasure decoding and prefetching) will be cancelled when
## the stream is closed.
##
trace "Retrieving blocks from manifest", manifestCid
let stream = LPStream(StoreStream.new(self.networkStore, manifest, pad = false))
var jobs: seq[Future[void]]
if manifest.protected:
# Retrieve, decode and save to the local store all EС groups
proc erasureJob(): Future[void] {.async.} =
proc erasureJob(): Future[void] {.async: (raises: []).} =
try:
# Spawn an erasure decoding job
let erasure = Erasure.new(
@ -260,36 +281,17 @@ proc streamEntireDataset(
)
without _ =? (await erasure.decode(manifest)), error:
error "Unable to erasure decode manifest", manifestCid, exc = error.msg
except CancelledError:
trace "Erasure job cancelled", manifestCid
except CatchableError as exc:
trace "Error erasure decoding manifest", manifestCid, exc = exc.msg
jobs.add(erasureJob())
self.trackedFutures.track(erasureJob())
proc prefetch(): Future[void] {.async.} =
try:
if err =?
(await self.fetchBatched(manifest, prefetchBatch, fetchLocal = false)).errorOption:
error "Unable to fetch blocks", err = err.msg
except CancelledError:
trace "Prefetch job cancelled"
except CatchableError as exc:
error "Error fetching blocks", exc = exc.msg
jobs.add(prefetch())
# Monitor stream completion and cancel background jobs when done
proc monitorStream() {.async.} =
try:
await stream.join()
finally:
await allFutures(jobs.mapIt(it.cancelAndWait))
self.trackedFutures.track(monitorStream())
self.trackedFutures.track(self.fetchDatasetAsync(manifest, fetchLocal = false))
# prefetch task should not fetch from local store
# Retrieve all blocks of the dataset sequentially from the local store or network
trace "Creating store stream for manifest", manifestCid
stream.success
LPStream(StoreStream.new(self.networkStore, manifest, pad = false)).success
proc retrieve*(
self: CodexNodeRef, cid: Cid, local: bool = true
@ -632,8 +634,11 @@ proc onStore(
let ensureExpiryFutures =
blocks.mapIt(self.networkStore.ensureExpiry(it.cid, expiry.toSecondsSince1970))
if updateExpiryErr =? (await allFutureResult(ensureExpiryFutures)).errorOption:
return failure(updateExpiryErr)
let res = await allFinishedFailed(ensureExpiryFutures)
if res.failure.len > 0:
trace "Some blocks failed to update expiry", len = res.failure.len
return failure("Some blocks failed to update expiry (" & $res.failure.len & " )")
if not blocksCb.isNil and err =? (await blocksCb(blocks)).errorOption:
trace "Unable to process blocks", err = err.msg

View File

@ -315,15 +315,8 @@ proc initDataApi(node: CodexNodeRef, repoStore: RepoStore, router: var RestRoute
error "Failed to fetch manifest", err = err.msg
return RestApiResponse.error(Http404, err.msg, headers = headers)
proc fetchDatasetAsync(): Future[void] {.async.} =
try:
if err =? (await node.fetchBatched(manifest)).errorOption:
error "Unable to fetch dataset", cid = cid.get(), err = err.msg
except CatchableError as exc:
error "CatchableError when fetching dataset", cid = cid.get(), exc = exc.msg
discard
asyncSpawn fetchDatasetAsync()
# Start fetching the dataset in the background
node.fetchDatasetAsyncTask(manifest)
let json = %formatManifest(cid.get(), manifest)
return RestApiResponse.response($json, contentType = "application/json")

View File

@ -341,48 +341,51 @@ proc onSlotFreed(sales: Sales, requestId: RequestId, slotIndex: uint64) =
trace "slot freed, adding to queue"
proc addSlotToQueue() {.async: (raises: [CancelledError]).} =
proc addSlotToQueue() {.async: (raises: []).} =
let context = sales.context
let market = context.market
let queue = context.slotQueue
without request =? (await market.getRequest(requestId)), err:
error "unknown request in contract", error = err.msgDetail
return
try:
without request =? (await market.getRequest(requestId)), err:
error "unknown request in contract", error = err.msgDetail
return
# Take the repairing state into consideration to calculate the collateral.
# This is particularly needed because it will affect the priority in the queue
# and we want to give the user the ability to tweak the parameters.
# Adding the repairing state directly in the queue priority calculation
# would not allow this flexibility.
without collateral =?
market.slotCollateral(request.ask.collateralPerSlot, SlotState.Repair), err:
error "Failed to add freed slot to queue: unable to calculate collateral",
error = err.msg
return
# Take the repairing state into consideration to calculate the collateral.
# This is particularly needed because it will affect the priority in the queue
# and we want to give the user the ability to tweak the parameters.
# Adding the repairing state directly in the queue priority calculation
# would not allow this flexibility.
without collateral =?
market.slotCollateral(request.ask.collateralPerSlot, SlotState.Repair), err:
error "Failed to add freed slot to queue: unable to calculate collateral",
error = err.msg
return
if slotIndex > uint16.high.uint64:
error "Cannot cast slot index to uint16, value = ", slotIndex
return
if slotIndex > uint16.high.uint64:
error "Cannot cast slot index to uint16, value = ", slotIndex
return
without slotQueueItem =?
SlotQueueItem.init(request, slotIndex.uint16, collateral = collateral).catch, err:
warn "Too many slots, cannot add to queue", error = err.msgDetail
return
without slotQueueItem =?
SlotQueueItem.init(request, slotIndex.uint16, collateral = collateral).catch,
err:
warn "Too many slots, cannot add to queue", error = err.msgDetail
return
if err =? queue.push(slotQueueItem).errorOption:
if err of SlotQueueItemExistsError:
error "Failed to push item to queue becaue it already exists",
error = err.msgDetail
elif err of QueueNotRunningError:
warn "Failed to push item to queue becaue queue is not running",
error = err.msgDetail
if err =? queue.push(slotQueueItem).errorOption:
if err of SlotQueueItemExistsError:
error "Failed to push item to queue becaue it already exists",
error = err.msgDetail
elif err of QueueNotRunningError:
warn "Failed to push item to queue becaue queue is not running",
error = err.msgDetail
except CatchableError as e:
warn "Failed to add slot to queue", error = e.msg
# We could get rid of this by adding the storage ask in the SlotFreed event,
# so we would not need to call getRequest to get the collateralPerSlot.
let fut = addSlotToQueue()
sales.trackedFutures.track(fut)
asyncSpawn fut
proc subscribeRequested(sales: Sales) {.async.} =
let context = sales.context
@ -522,7 +525,9 @@ proc startSlotQueue(sales: Sales) =
let slotQueue = sales.context.slotQueue
let reservations = sales.context.reservations
slotQueue.onProcessSlot = proc(item: SlotQueueItem, done: Future[void]) {.async.} =
slotQueue.onProcessSlot = proc(
item: SlotQueueItem, done: Future[void]
) {.async: (raises: []).} =
trace "processing slot queue item", reqId = item.requestId, slotIdx = item.slotIndex
sales.processSlot(item, done)

View File

@ -103,7 +103,6 @@ proc subscribeCancellation(agent: SalesAgent) {.async.} =
error "Error while waiting for expiry to lapse", error = e.msgDetail
data.cancelled = onCancelled()
asyncSpawn data.cancelled
method onFulfilled*(
agent: SalesAgent, requestId: RequestId

View File

@ -3,7 +3,6 @@ import std/tables
import pkg/chronos
import pkg/questionable
import pkg/questionable/results
import pkg/upraises
import ../errors
import ../clock
import ../logutils
@ -17,8 +16,9 @@ logScope:
topics = "marketplace slotqueue"
type
OnProcessSlot* =
proc(item: SlotQueueItem, done: Future[void]): Future[void] {.gcsafe, upraises: [].}
OnProcessSlot* = proc(item: SlotQueueItem, done: Future[void]): Future[void] {.
gcsafe, async: (raises: [])
.}
# Non-ref obj copies value when assigned, preventing accidental modification
# of values which could cause an incorrect order (eg
@ -26,7 +26,7 @@ type
# but the heap invariant would no longer be honoured. When non-ref, the
# compiler can ensure that statement will fail).
SlotQueueWorker = object
doneProcessing*: Future[void]
doneProcessing*: Future[void].Raising([])
SlotQueueItem* = object
requestId: RequestId
@ -126,7 +126,17 @@ proc new*(
# `newAsyncQueue` procedure
proc init(_: type SlotQueueWorker): SlotQueueWorker =
SlotQueueWorker(doneProcessing: newFuture[void]("slotqueue.worker.processing"))
let workerFut = Future[void].Raising([]).init(
"slotqueue.worker.processing", {FutureFlag.OwnCancelSchedule}
)
workerFut.cancelCallback = proc(data: pointer) {.raises: [].} =
# this is equivalent to try: ... except CatchableError: ...
if not workerFut.finished:
workerFut.complete()
trace "Cancelling `SlotQueue` worker processing future"
SlotQueueWorker(doneProcessing: workerFut)
proc init*(
_: type SlotQueueItem,
@ -419,7 +429,6 @@ proc run(self: SlotQueue) {.async: (raises: []).} =
let fut = self.dispatch(worker, item)
self.trackedFutures.track(fut)
asyncSpawn fut
await sleepAsync(1.millis) # poll
except CancelledError:
@ -447,7 +456,6 @@ proc start*(self: SlotQueue) =
let fut = self.run()
self.trackedFutures.track(fut)
asyncSpawn fut
proc stop*(self: SlotQueue) {.async.} =
if not self.running:

View File

@ -315,13 +315,15 @@ proc new*[T, H](
cellSize = cellSize
if (manifest.blocksCount mod manifest.numSlots) != 0:
trace "Number of blocks must be divisable by number of slots."
return failure("Number of blocks must be divisable by number of slots.")
const msg = "Number of blocks must be divisible by number of slots."
trace msg
return failure(msg)
let cellSize = if manifest.verifiable: manifest.cellSize else: cellSize
if (manifest.blockSize mod cellSize) != 0.NBytes:
trace "Block size must be divisable by cell size."
return failure("Block size must be divisable by cell size.")
const msg = "Block size must be divisible by cell size."
trace msg
return failure(msg)
let
numSlotBlocks = manifest.numSlotBlocks

View File

@ -38,7 +38,9 @@ type
AnyProof* = CircomProof
AnySampler* = Poseidon2Sampler
# add any other generic type here, eg. Poseidon2Sampler | ReinforceConcreteSampler
AnyBuilder* = Poseidon2Builder
# add any other generic type here, eg. Poseidon2Builder | ReinforceConcreteBuilder
AnyProofInputs* = ProofInputs[Poseidon2Hash]
Prover* = ref object of RootObj

View File

@ -57,6 +57,8 @@ template withExceptions(body: untyped) =
raise newLPStreamEOFError()
except AsyncStreamError as exc:
raise newException(LPStreamError, exc.msg)
except CatchableError as exc:
raise newException(Defect, "Unexpected error in AsyncStreamWrapper", exc)
method readOnce*(
self: AsyncStreamWrapper, pbytes: pointer, nbytes: int
@ -74,11 +76,13 @@ method readOnce*(
proc completeWrite(
self: AsyncStreamWrapper, fut: Future[void], msgLen: int
): Future[void] {.async.} =
): Future[void] {.async: (raises: [CancelledError, LPStreamError]).} =
withExceptions:
await fut
method write*(self: AsyncStreamWrapper, msg: seq[byte]): Future[void] =
method write*(
self: AsyncStreamWrapper, msg: seq[byte]
): Future[void] {.async: (raises: [CancelledError, LPStreamError], raw: true).} =
# Avoid a copy of msg being kept in the closure created by `{.async.}` as this
# drives up memory usage

View File

@ -67,13 +67,9 @@ method atEof*(self: StoreStream): bool =
self.offset >= self.size
type LPStreamReadError* = object of LPStreamError
par*: ref CatchableError
proc newLPStreamReadError*(p: ref CatchableError): ref LPStreamReadError =
var w = newException(LPStreamReadError, "Read stream failed")
w.msg = w.msg & ", originated from [" & $p.name & "] " & p.msg
w.par = p
result = w
newException(LPStreamReadError, "Read stream failed", p)
method readOnce*(
self: StoreStream, pbytes: pointer, nbytes: int

View File

@ -74,7 +74,6 @@ proc scheduler(machine: Machine) {.async: (raises: []).} =
debug "enter state", state = fromState & " => " & $machine.state
running = machine.run(machine.state)
machine.trackedFutures.track(running)
asyncSpawn running
except CancelledError:
break # do not propagate bc it is asyncSpawned
@ -88,7 +87,6 @@ proc start*(machine: Machine, initialState: State) =
machine.started = true
let fut = machine.scheduler()
machine.trackedFutures.track(fut)
asyncSpawn fut
machine.schedule(Event.transition(machine.state, initialState))
proc stop*(machine: Machine) {.async.} =

View File

@ -50,7 +50,6 @@ method start*(
timer.callback = callback
timer.interval = interval
timer.loopFuture = timerLoop(timer)
asyncSpawn timer.loopFuture
method stop*(timer: Timer) {.async, base.} =
if timer.loopFuture != nil and not timer.loopFuture.finished:

View File

@ -5,9 +5,11 @@ import ../logutils
{.push raises: [].}
type TrackedFutures* = ref object
futures: Table[uint, FutureBase]
cancelling: bool
type
TrackedFuture = Future[void].Raising([])
TrackedFutures* = ref object
futures: Table[uint, TrackedFuture]
cancelling: bool
logScope:
topics = "trackable futures"
@ -15,15 +17,18 @@ logScope:
proc len*(self: TrackedFutures): int =
self.futures.len
proc removeFuture(self: TrackedFutures, future: FutureBase) =
proc removeFuture(self: TrackedFutures, future: TrackedFuture) =
if not self.cancelling and not future.isNil:
self.futures.del(future.id)
proc track*[T](self: TrackedFutures, fut: Future[T]) =
proc track*(self: TrackedFutures, fut: TrackedFuture) =
if self.cancelling:
return
self.futures[fut.id] = FutureBase(fut)
if fut.finished:
return
self.futures[fut.id] = fut
proc cb(udata: pointer) =
self.removeFuture(fut)
@ -33,13 +38,8 @@ proc track*[T](self: TrackedFutures, fut: Future[T]) =
proc cancelTracked*(self: TrackedFutures) {.async: (raises: []).} =
self.cancelling = true
trace "cancelling tracked futures"
var cancellations: seq[FutureBase]
for future in self.futures.values:
if not future.isNil and not future.finished:
cancellations.add future.cancelAndWait()
trace "cancelling tracked futures", len = self.futures.len
let cancellations = self.futures.values.toSeq.mapIt(it.cancelAndWait())
await noCancel allFutures cancellations
self.futures.clear()

View File

@ -142,7 +142,6 @@ proc start*(validation: Validation) {.async.} =
await validation.subscribeSlotFilled()
await validation.restoreHistoricalState()
validation.running = validation.run()
asyncSpawn validation.running
proc stop*(validation: Validation) {.async.} =
if not validation.running.isNil and not validation.running.finished:

View File

@ -1,3 +1,3 @@
import pkg/asynctest/chronos/unittest
import pkg/asynctest/chronos/unittest2
export unittest
export unittest2

View File

@ -84,12 +84,12 @@ asyncchecksuite "Block Advertising and Discovery":
blockDiscovery.publishBlockProvideHandler = proc(
d: MockDiscovery, cid: Cid
): Future[void] {.async, gcsafe.} =
): Future[void] {.async: (raises: [CancelledError]).} =
return
blockDiscovery.findBlockProvidersHandler = proc(
d: MockDiscovery, cid: Cid
): Future[seq[SignedPeerRecord]] {.async.} =
): Future[seq[SignedPeerRecord]] {.async: (raises: [CancelledError]).} =
await engine.resolveBlocks(blocks.filterIt(it.cid == cid))
await allFuturesThrowing(allFinished(pendingBlocks))
@ -97,17 +97,17 @@ asyncchecksuite "Block Advertising and Discovery":
await engine.stop()
test "Should advertise trees":
let
cids = @[manifest.treeCid]
advertised = initTable.collect:
for cid in cids:
{cid: newFuture[void]()}
let cids = @[manifest.treeCid]
var advertised = initTable.collect:
for cid in cids:
{cid: newFuture[void]()}
blockDiscovery.publishBlockProvideHandler = proc(
d: MockDiscovery, cid: Cid
) {.async.} =
if cid in advertised and not advertised[cid].finished():
advertised[cid].complete()
) {.async: (raises: [CancelledError]).} =
advertised.withValue(cid, fut):
if not fut[].finished:
fut[].complete()
await engine.start()
await allFuturesThrowing(allFinished(toSeq(advertised.values)))
@ -118,7 +118,7 @@ asyncchecksuite "Block Advertising and Discovery":
blockDiscovery.publishBlockProvideHandler = proc(
d: MockDiscovery, cid: Cid
) {.async.} =
) {.async: (raises: [CancelledError]).} =
check:
cid notin blockCids
@ -138,7 +138,7 @@ asyncchecksuite "Block Advertising and Discovery":
blockDiscovery.findBlockProvidersHandler = proc(
d: MockDiscovery, cid: Cid
): Future[seq[SignedPeerRecord]] =
): Future[seq[SignedPeerRecord]] {.async: (raises: [CancelledError]).} =
check false
await engine.start()
@ -221,17 +221,17 @@ asyncchecksuite "E2E - Multiple Nodes Discovery":
MockDiscovery(blockexc[1].engine.discovery.discovery).publishBlockProvideHandler = proc(
d: MockDiscovery, cid: Cid
): Future[void] {.async.} =
) {.async: (raises: [CancelledError]).} =
advertised[cid] = switch[1].peerInfo.signedPeerRecord
MockDiscovery(blockexc[2].engine.discovery.discovery).publishBlockProvideHandler = proc(
d: MockDiscovery, cid: Cid
): Future[void] {.async.} =
) {.async: (raises: [CancelledError]).} =
advertised[cid] = switch[2].peerInfo.signedPeerRecord
MockDiscovery(blockexc[3].engine.discovery.discovery).publishBlockProvideHandler = proc(
d: MockDiscovery, cid: Cid
): Future[void] {.async.} =
) {.async: (raises: [CancelledError]).} =
advertised[cid] = switch[3].peerInfo.signedPeerRecord
discard blockexc[1].engine.pendingBlocks.getWantHandle(mBlocks[0].cid)
@ -266,23 +266,21 @@ asyncchecksuite "E2E - Multiple Nodes Discovery":
MockDiscovery(blockexc[0].engine.discovery.discovery).findBlockProvidersHandler = proc(
d: MockDiscovery, cid: Cid
): Future[seq[SignedPeerRecord]] {.async.} =
if cid in advertised:
result.add(advertised[cid])
): Future[seq[SignedPeerRecord]] {.async: (raises: [CancelledError]).} =
advertised.withValue(cid, val):
result.add(val[])
let futs = collect(newSeq):
for m in mBlocks[0 .. 2]:
blockexc[0].engine.requestBlock(m.cid)
await allFuturesThrowing(
switch.mapIt(it.start()) & blockexc.mapIt(it.engine.start())
)
.wait(10.seconds)
await allFuturesThrowing(switch.mapIt(it.start())).wait(10.seconds)
await allFuturesThrowing(blockexc.mapIt(it.engine.start())).wait(10.seconds)
await allFutures(futs).wait(10.seconds)
await allFuturesThrowing(blockexc.mapIt(it.engine.stop()) & switch.mapIt(it.stop()))
.wait(10.seconds)
await allFuturesThrowing(blockexc.mapIt(it.engine.stop())).wait(10.seconds)
await allFuturesThrowing(switch.mapIt(it.stop())).wait(10.seconds)
test "E2E - Should advertise and discover blocks with peers already connected":
# Distribute the blocks amongst 1..3
@ -292,17 +290,17 @@ asyncchecksuite "E2E - Multiple Nodes Discovery":
MockDiscovery(blockexc[1].engine.discovery.discovery).publishBlockProvideHandler = proc(
d: MockDiscovery, cid: Cid
): Future[void] {.async.} =
) {.async: (raises: [CancelledError]).} =
advertised[cid] = switch[1].peerInfo.signedPeerRecord
MockDiscovery(blockexc[2].engine.discovery.discovery).publishBlockProvideHandler = proc(
d: MockDiscovery, cid: Cid
): Future[void] {.async.} =
) {.async: (raises: [CancelledError]).} =
advertised[cid] = switch[2].peerInfo.signedPeerRecord
MockDiscovery(blockexc[3].engine.discovery.discovery).publishBlockProvideHandler = proc(
d: MockDiscovery, cid: Cid
): Future[void] {.async.} =
) {.async: (raises: [CancelledError]).} =
advertised[cid] = switch[3].peerInfo.signedPeerRecord
discard blockexc[1].engine.pendingBlocks.getWantHandle(mBlocks[0].cid)
@ -337,18 +335,16 @@ asyncchecksuite "E2E - Multiple Nodes Discovery":
MockDiscovery(blockexc[0].engine.discovery.discovery).findBlockProvidersHandler = proc(
d: MockDiscovery, cid: Cid
): Future[seq[SignedPeerRecord]] {.async.} =
if cid in advertised:
return @[advertised[cid]]
): Future[seq[SignedPeerRecord]] {.async: (raises: [CancelledError]).} =
advertised.withValue(cid, val):
return @[val[]]
let futs = mBlocks[0 .. 2].mapIt(blockexc[0].engine.requestBlock(it.cid))
await allFuturesThrowing(
switch.mapIt(it.start()) & blockexc.mapIt(it.engine.start())
)
.wait(10.seconds)
await allFuturesThrowing(switch.mapIt(it.start())).wait(10.seconds)
await allFuturesThrowing(blockexc.mapIt(it.engine.start())).wait(10.seconds)
await allFutures(futs).wait(10.seconds)
await allFuturesThrowing(blockexc.mapIt(it.engine.stop()) & switch.mapIt(it.stop()))
.wait(10.seconds)
await allFuturesThrowing(blockexc.mapIt(it.engine.stop())).wait(10.seconds)
await allFuturesThrowing(switch.mapIt(it.stop())).wait(10.seconds)

View File

@ -68,7 +68,7 @@ asyncchecksuite "Test Discovery Engine":
blockDiscovery.findBlockProvidersHandler = proc(
d: MockDiscovery, cid: Cid
): Future[seq[SignedPeerRecord]] {.async, gcsafe.} =
): Future[seq[SignedPeerRecord]] {.async: (raises: [CancelledError]).} =
pendingBlocks.resolve(
blocks.filterIt(it.cid == cid).mapIt(
BlockDelivery(blk: it, address: it.address)
@ -94,7 +94,7 @@ asyncchecksuite "Test Discovery Engine":
blockDiscovery.findBlockProvidersHandler = proc(
d: MockDiscovery, cid: Cid
): Future[seq[SignedPeerRecord]] {.async, gcsafe.} =
): Future[seq[SignedPeerRecord]] {.async: (raises: [CancelledError]).} =
check cid == blocks[0].cid
if not want.finished:
want.complete()
@ -122,7 +122,7 @@ asyncchecksuite "Test Discovery Engine":
var pendingCids = newSeq[Cid]()
blockDiscovery.findBlockProvidersHandler = proc(
d: MockDiscovery, cid: Cid
): Future[seq[SignedPeerRecord]] {.async, gcsafe.} =
): Future[seq[SignedPeerRecord]] {.async: (raises: [CancelledError]).} =
check cid in pendingCids
pendingCids.keepItIf(it != cid)
check peerStore.len < minPeers
@ -159,12 +159,12 @@ asyncchecksuite "Test Discovery Engine":
discoveryLoopSleep = 100.millis,
concurrentDiscReqs = 2,
)
reqs = newFuture[void]()
reqs = Future[void].Raising([CancelledError]).init()
count = 0
blockDiscovery.findBlockProvidersHandler = proc(
d: MockDiscovery, cid: Cid
): Future[seq[SignedPeerRecord]] {.gcsafe, async.} =
): Future[seq[SignedPeerRecord]] {.async: (raises: [CancelledError]).} =
check cid == blocks[0].cid
if count > 0:
check false

View File

@ -34,7 +34,7 @@ asyncchecksuite "Advertiser":
advertised = newSeq[Cid]()
blockDiscovery.publishBlockProvideHandler = proc(
d: MockDiscovery, cid: Cid
) {.async, gcsafe.} =
) {.async: (raises: [CancelledError]), gcsafe.} =
advertised.add(cid)
advertiser = Advertiser.new(localStore, blockDiscovery)

View File

@ -22,7 +22,7 @@ import ../../examples
const NopSendWantCancellationsProc = proc(
id: PeerId, addresses: seq[BlockAddress]
) {.gcsafe, async.} =
) {.async: (raises: [CancelledError]).} =
discard
asyncchecksuite "NetworkStore engine basic":
@ -66,20 +66,17 @@ asyncchecksuite "NetworkStore engine basic":
wantType: WantType = WantType.WantHave,
full: bool = false,
sendDontHave: bool = false,
) {.gcsafe, async.} =
) {.async: (raises: [CancelledError]).} =
check addresses.mapIt($it.cidOrTreeCid).sorted == blocks.mapIt($it.cid).sorted
done.complete()
let
network = BlockExcNetwork(request: BlockExcRequest(sendWantList: sendWantList))
localStore = CacheStore.new(blocks.mapIt(it))
discovery = DiscoveryEngine.new(
localStore, peerStore, network, blockDiscovery, pendingBlocks
)
advertiser = Advertiser.new(localStore, blockDiscovery)
engine = BlockExcEngine.new(
localStore, wallet, network, discovery, advertiser, peerStore, pendingBlocks
)
@ -93,7 +90,9 @@ asyncchecksuite "NetworkStore engine basic":
test "Should send account to new peers":
let pricing = Pricing.example
proc sendAccount(peer: PeerId, account: Account) {.gcsafe, async.} =
proc sendAccount(
peer: PeerId, account: Account
) {.async: (raises: [CancelledError]).} =
check account.address == pricing.address
done.complete()
@ -186,7 +185,9 @@ asyncchecksuite "NetworkStore engine handlers":
done = newFuture[void]()
wantList = makeWantList(blocks.mapIt(it.cid))
proc sendPresence(peerId: PeerId, presence: seq[BlockPresence]) {.gcsafe, async.} =
proc sendPresence(
peerId: PeerId, presence: seq[BlockPresence]
) {.async: (raises: [CancelledError]).} =
check presence.mapIt(it.address) == wantList.entries.mapIt(it.address)
done.complete()
@ -203,7 +204,9 @@ asyncchecksuite "NetworkStore engine handlers":
done = newFuture[void]()
wantList = makeWantList(blocks.mapIt(it.cid), sendDontHave = true)
proc sendPresence(peerId: PeerId, presence: seq[BlockPresence]) {.gcsafe, async.} =
proc sendPresence(
peerId: PeerId, presence: seq[BlockPresence]
) {.async: (raises: [CancelledError]).} =
check presence.mapIt(it.address) == wantList.entries.mapIt(it.address)
for p in presence:
check:
@ -222,7 +225,9 @@ asyncchecksuite "NetworkStore engine handlers":
done = newFuture[void]()
wantList = makeWantList(blocks.mapIt(it.cid), sendDontHave = true)
proc sendPresence(peerId: PeerId, presence: seq[BlockPresence]) {.gcsafe, async.} =
proc sendPresence(
peerId: PeerId, presence: seq[BlockPresence]
) {.async: (raises: [CancelledError]).} =
for p in presence:
if p.address.cidOrTreeCid != blocks[0].cid and
p.address.cidOrTreeCid != blocks[1].cid:
@ -266,19 +271,21 @@ asyncchecksuite "NetworkStore engine handlers":
peerContext.account = account.some
peerContext.blocks = blocks.mapIt(
(it.address, Presence(address: it.address, price: rand(uint16).u256))
(it.address, Presence(address: it.address, price: rand(uint16).u256, have: true))
).toTable
engine.network = BlockExcNetwork(
request: BlockExcRequest(
sendPayment: proc(receiver: PeerId, payment: SignedState) {.gcsafe, async.} =
sendPayment: proc(
receiver: PeerId, payment: SignedState
) {.async: (raises: [CancelledError]).} =
let
amount = blocks.mapIt(peerContext.blocks[it.address].price).foldl(a + b)
amount =
blocks.mapIt(peerContext.blocks[it.address].catch.get.price).foldl(a + b)
balances = !payment.state.outcome.balances(Asset)
check receiver == peerId
check balances[account.address.toDestination] == amount
check balances[account.address.toDestination].catch.get == amount
done.complete(),
# Install NOP for want list cancellations so they don't cause a crash
@ -286,10 +293,12 @@ asyncchecksuite "NetworkStore engine handlers":
)
)
let requestedBlocks = blocks.mapIt(engine.pendingBlocks.getWantHandle(it.address))
await engine.blocksDeliveryHandler(
peerId, blocks.mapIt(BlockDelivery(blk: it, address: it.address))
)
await done.wait(100.millis)
await allFuturesThrowing(requestedBlocks).wait(100.millis)
test "Should handle block presence":
var handles:
@ -303,7 +312,7 @@ asyncchecksuite "NetworkStore engine handlers":
wantType: WantType = WantType.WantHave,
full: bool = false,
sendDontHave: bool = false,
) {.gcsafe, async.} =
) {.async: (raises: [CancelledError]).} =
engine.pendingBlocks.resolve(
blocks.filterIt(it.address in addresses).mapIt(
BlockDelivery(blk: it, address: it.address)
@ -340,9 +349,9 @@ asyncchecksuite "NetworkStore engine handlers":
proc sendWantCancellations(
id: PeerId, addresses: seq[BlockAddress]
) {.gcsafe, async.} =
) {.async: (raises: [CancelledError]).} =
for address in addresses:
cancellations[address].complete()
cancellations[address].catch.expect("address should exist").complete()
engine.network = BlockExcNetwork(
request: BlockExcRequest(sendWantCancellations: sendWantCancellations)
@ -416,7 +425,7 @@ asyncchecksuite "Block Download":
wantType: WantType = WantType.WantHave,
full: bool = false,
sendDontHave: bool = false,
) {.gcsafe, async.} =
) {.async: (raises: [CancelledError]).} =
check wantType == WantHave
check not engine.pendingBlocks.isInFlight(address)
check engine.pendingBlocks.retries(address) == retries
@ -433,7 +442,7 @@ asyncchecksuite "Block Download":
discard (await pending).tryGet()
test "Should retry block request":
let
var
address = BlockAddress.init(blocks[0].cid)
steps = newAsyncEvent()
@ -445,7 +454,7 @@ asyncchecksuite "Block Download":
wantType: WantType = WantType.WantHave,
full: bool = false,
sendDontHave: bool = false,
) {.gcsafe, async.} =
) {.async: (raises: [CancelledError]).} =
case wantType
of WantHave:
check engine.pendingBlocks.isInFlight(address) == false
@ -467,7 +476,7 @@ asyncchecksuite "Block Download":
let pending = engine.requestBlock(address)
await steps.wait()
# add blocks presence
# add blocks precense
peerCtx.blocks = blocks.mapIt(
(it.address, Presence(address: it.address, have: true, price: UInt256.example))
).toTable
@ -493,7 +502,7 @@ asyncchecksuite "Block Download":
wantType: WantType = WantType.WantHave,
full: bool = false,
sendDontHave: bool = false,
) {.gcsafe, async.} =
) {.async: (raises: [CancelledError]).} =
done.complete()
engine.pendingBlocks.blockRetries = 10
@ -573,7 +582,7 @@ asyncchecksuite "Task Handler":
test "Should send want-blocks in priority order":
proc sendBlocksDelivery(
id: PeerId, blocksDelivery: seq[BlockDelivery]
) {.gcsafe, async.} =
) {.async: (raises: [CancelledError]).} =
check blocksDelivery.len == 2
check:
blocksDelivery[1].address == blocks[0].address
@ -610,7 +619,7 @@ asyncchecksuite "Task Handler":
test "Should set in-flight for outgoing blocks":
proc sendBlocksDelivery(
id: PeerId, blocksDelivery: seq[BlockDelivery]
) {.gcsafe, async.} =
) {.async: (raises: [CancelledError]).} =
check peersCtx[0].peerWants[0].inFlight
for blk in blocks:
@ -649,7 +658,9 @@ asyncchecksuite "Task Handler":
let missing = @[Block.new("missing".toBytes).tryGet()]
let price = (!engine.pricing).price
proc sendPresence(id: PeerId, presence: seq[BlockPresence]) {.gcsafe, async.} =
proc sendPresence(
id: PeerId, presence: seq[BlockPresence]
) {.async: (raises: [CancelledError]).} =
check presence.mapIt(!Presence.init(it)) ==
@[
Presence(address: present[0].address, have: true, price: price),

View File

@ -1,10 +1,10 @@
import std/unittest
import pkg/unittest2
import pkg/codex/stores
import ../../examples
import ../../helpers
checksuite "engine payments":
suite "Engine payments":
let address = EthAddress.example
let amount = 42.u256

View File

@ -6,7 +6,7 @@ import ../../../asynctest
import ../../examples
import ../../helpers
checksuite "account protobuf messages":
suite "account protobuf messages":
let account = Account(address: EthAddress.example)
let message = AccountMessage.init(account)
@ -21,7 +21,7 @@ checksuite "account protobuf messages":
incorrect.address.del(0)
check Account.init(incorrect).isNone
checksuite "channel update messages":
suite "channel update messages":
let state = SignedState.example
let update = StateChannelUpdate.init(state)

View File

@ -6,7 +6,7 @@ import ../../../asynctest
import ../../examples
import ../../helpers
checksuite "block presence protobuf messages":
suite "block presence protobuf messages":
let
cid = Cid.example
address = BlockAddress(leaf: false, cid: cid)

View File

@ -26,7 +26,7 @@ asyncchecksuite "Network - Handlers":
blocks: seq[bt.Block]
done: Future[void]
proc getConn(): Future[Connection] {.async.} =
proc getConn(): Future[Connection] {.async: (raises: [CancelledError]).} =
return Connection(buffer)
setup:
@ -45,7 +45,7 @@ asyncchecksuite "Network - Handlers":
discard await networkPeer.connect()
test "Want List handler":
proc wantListHandler(peer: PeerId, wantList: WantList) {.gcsafe, async.} =
proc wantListHandler(peer: PeerId, wantList: WantList) {.async: (raises: []).} =
# check that we got the correct amount of entries
check wantList.entries.len == 4
@ -72,7 +72,7 @@ asyncchecksuite "Network - Handlers":
test "Blocks Handler":
proc blocksDeliveryHandler(
peer: PeerId, blocksDelivery: seq[BlockDelivery]
) {.gcsafe, async.} =
) {.async: (raises: []).} =
check blocks == blocksDelivery.mapIt(it.blk)
done.complete()
@ -85,7 +85,9 @@ asyncchecksuite "Network - Handlers":
await done.wait(500.millis)
test "Presence Handler":
proc presenceHandler(peer: PeerId, presence: seq[BlockPresence]) {.gcsafe, async.} =
proc presenceHandler(
peer: PeerId, presence: seq[BlockPresence]
) {.async: (raises: []).} =
for b in blocks:
check:
b.address in presence
@ -105,7 +107,7 @@ asyncchecksuite "Network - Handlers":
test "Handles account messages":
let account = Account(address: EthAddress.example)
proc handleAccount(peer: PeerId, received: Account) {.gcsafe, async.} =
proc handleAccount(peer: PeerId, received: Account) {.async: (raises: []).} =
check received == account
done.complete()
@ -119,7 +121,7 @@ asyncchecksuite "Network - Handlers":
test "Handles payment messages":
let payment = SignedState.example
proc handlePayment(peer: PeerId, received: SignedState) {.gcsafe, async.} =
proc handlePayment(peer: PeerId, received: SignedState) {.async: (raises: []).} =
check received == payment
done.complete()
@ -165,7 +167,7 @@ asyncchecksuite "Network - Senders":
await allFuturesThrowing(switch1.stop(), switch2.stop())
test "Send want list":
proc wantListHandler(peer: PeerId, wantList: WantList) {.gcsafe, async.} =
proc wantListHandler(peer: PeerId, wantList: WantList) {.async: (raises: []).} =
# check that we got the correct amount of entries
check wantList.entries.len == 4
@ -195,7 +197,7 @@ asyncchecksuite "Network - Senders":
test "send blocks":
proc blocksDeliveryHandler(
peer: PeerId, blocksDelivery: seq[BlockDelivery]
) {.gcsafe, async.} =
) {.async: (raises: []).} =
check blocks == blocksDelivery.mapIt(it.blk)
done.complete()
@ -207,7 +209,9 @@ asyncchecksuite "Network - Senders":
await done.wait(500.millis)
test "send presence":
proc presenceHandler(peer: PeerId, precense: seq[BlockPresence]) {.gcsafe, async.} =
proc presenceHandler(
peer: PeerId, precense: seq[BlockPresence]
) {.async: (raises: []).} =
for b in blocks:
check:
b.address in precense
@ -226,7 +230,7 @@ asyncchecksuite "Network - Senders":
test "send account":
let account = Account(address: EthAddress.example)
proc handleAccount(peer: PeerId, received: Account) {.gcsafe, async.} =
proc handleAccount(peer: PeerId, received: Account) {.async: (raises: []).} =
check received == account
done.complete()
@ -238,7 +242,7 @@ asyncchecksuite "Network - Senders":
test "send payment":
let payment = SignedState.example
proc handlePayment(peer: PeerId, received: SignedState) {.gcsafe, async.} =
proc handlePayment(peer: PeerId, received: SignedState) {.async: (raises: []).} =
check received == payment
done.complete()
@ -276,7 +280,7 @@ asyncchecksuite "Network - Test Limits":
let account = Account(address: EthAddress.example)
network2.handlers.onAccount = proc(
peer: PeerId, received: Account
) {.gcsafe, async.} =
) {.async: (raises: []).} =
check false
let fut = network1.send(

View File

@ -1,7 +1,7 @@
import std/sugar
import std/sequtils
import std/unittest
import pkg/unittest2
import pkg/libp2p
import pkg/codex/blockexchange/peers
@ -11,7 +11,7 @@ import pkg/codex/blockexchange/protobuf/presence
import ../helpers
import ../examples
checksuite "Peer Context Store":
suite "Peer Context Store":
var
store: PeerCtxStore
peerCtx: BlockExcPeerCtx
@ -31,7 +31,7 @@ checksuite "Peer Context Store":
test "Should get peer":
check store.get(peerCtx.id) == peerCtx
checksuite "Peer Context Store Peer Selection":
suite "Peer Context Store Peer Selection":
var
store: PeerCtxStore
peerCtxs: seq[BlockExcPeerCtx]

View File

@ -10,7 +10,7 @@ import pkg/codex/blockexchange
import ../helpers
import ../../asynctest
checksuite "Pending Blocks":
suite "Pending Blocks":
test "Should add want handle":
let
pendingBlocks = PendingBlocksManager.new()

View File

@ -21,7 +21,7 @@ proc new*(
var consumed = 0
proc reader(
data: ChunkBuffer, len: int
): Future[int] {.async, gcsafe, raises: [Defect].} =
): Future[int] {.gcsafe, async: (raises: [ChunkerError, CancelledError]).} =
if consumed >= dataset.len:
return 0

View File

@ -14,29 +14,42 @@ import pkg/codex/discovery
import pkg/contractabi/address as ca
type MockDiscovery* = ref object of Discovery
findBlockProvidersHandler*:
proc(d: MockDiscovery, cid: Cid): Future[seq[SignedPeerRecord]] {.gcsafe.}
publishBlockProvideHandler*: proc(d: MockDiscovery, cid: Cid): Future[void] {.gcsafe.}
findHostProvidersHandler*:
proc(d: MockDiscovery, host: ca.Address): Future[seq[SignedPeerRecord]] {.gcsafe.}
publishHostProvideHandler*:
proc(d: MockDiscovery, host: ca.Address): Future[void] {.gcsafe.}
findBlockProvidersHandler*: proc(
d: MockDiscovery, cid: Cid
): Future[seq[SignedPeerRecord]] {.async: (raises: [CancelledError]).}
publishBlockProvideHandler*:
proc(d: MockDiscovery, cid: Cid): Future[void] {.async: (raises: [CancelledError]).}
findHostProvidersHandler*: proc(
d: MockDiscovery, host: ca.Address
): Future[seq[SignedPeerRecord]] {.async: (raises: [CancelledError]).}
publishHostProvideHandler*: proc(d: MockDiscovery, host: ca.Address): Future[void] {.
async: (raises: [CancelledError])
.}
proc new*(T: type MockDiscovery): MockDiscovery =
MockDiscovery()
proc findPeer*(d: Discovery, peerId: PeerId): Future[?PeerRecord] {.async.} =
proc findPeer*(
d: Discovery, peerId: PeerId
): Future[?PeerRecord] {.async: (raises: [CancelledError]).} =
## mock find a peer - always return none
##
##
return none(PeerRecord)
method find*(d: MockDiscovery, cid: Cid): Future[seq[SignedPeerRecord]] {.async.} =
method find*(
d: MockDiscovery, cid: Cid
): Future[seq[SignedPeerRecord]] {.async: (raises: [CancelledError]).} =
if isNil(d.findBlockProvidersHandler):
return
return await d.findBlockProvidersHandler(d, cid)
method provide*(d: MockDiscovery, cid: Cid): Future[void] {.async.} =
method provide*(
d: MockDiscovery, cid: Cid
): Future[void] {.async: (raises: [CancelledError]).} =
if isNil(d.publishBlockProvideHandler):
return
@ -44,13 +57,15 @@ method provide*(d: MockDiscovery, cid: Cid): Future[void] {.async.} =
method find*(
d: MockDiscovery, host: ca.Address
): Future[seq[SignedPeerRecord]] {.async.} =
): Future[seq[SignedPeerRecord]] {.async: (raises: [CancelledError]).} =
if isNil(d.findHostProvidersHandler):
return
return await d.findHostProvidersHandler(d, host)
method provide*(d: MockDiscovery, host: ca.Address): Future[void] {.async.} =
method provide*(
d: MockDiscovery, host: ca.Address
): Future[void] {.async: (raises: [CancelledError]).} =
if isNil(d.publishHostProvideHandler):
return

View File

@ -26,7 +26,7 @@ proc new*(
var consumed = 0
proc reader(
data: ChunkBuffer, len: int
): Future[int] {.async, gcsafe, raises: [Defect].} =
): Future[int] {.async: (raises: [ChunkerError, CancelledError]), gcsafe.} =
var alpha = toSeq(byte('A') .. byte('z'))
if consumed >= size:

View File

@ -1,4 +1,4 @@
import std/unittest
import pkg/unittest2
import pkg/codex/merkletree

View File

@ -1,4 +1,4 @@
import std/unittest
import pkg/unittest2
import pkg/questionable/results
import pkg/stew/byteutils
@ -18,7 +18,7 @@ const data = [
"00000000000000000000000000000009".toBytes, "00000000000000000000000000000010".toBytes,
]
checksuite "merkletree - coders":
suite "merkletree - coders":
test "encoding and decoding a tree yields the same tree":
let
tree = CodexTree.init(Sha256HashCodec, data).tryGet()

View File

@ -1,6 +1,6 @@
import std/unittest
import std/sequtils
import pkg/unittest2
import pkg/questionable/results
import pkg/stew/byteutils
import pkg/libp2p

View File

@ -1,7 +1,7 @@
import std/unittest
import std/sequtils
import std/random
import pkg/unittest2
import pkg/poseidon2
import pkg/poseidon2/sponge

View File

@ -1,6 +1,6 @@
import std/unittest
import std/sequtils
import pkg/unittest2
import pkg/poseidon2
import pkg/poseidon2/io
import pkg/questionable/results

View File

@ -1,4 +1,4 @@
import std/unittest
import pkg/unittest2
import pkg/questionable
import pkg/codex/contracts/requests
import pkg/codex/sales/states/cancelled
@ -8,7 +8,7 @@ import pkg/codex/sales/states/filled
import ../../examples
import ../../helpers
checksuite "sales state 'downloading'":
suite "sales state 'downloading'":
let request = StorageRequest.example
let slotIndex = request.ask.slots div 2
var state: SaleDownloading

View File

@ -14,7 +14,7 @@ import ../../helpers/mockmarket
import ../../examples
import ../../helpers
checksuite "sales state 'filled'":
suite "sales state 'filled'":
let request = StorageRequest.example
let slotIndex = request.ask.slots div 2

View File

@ -1,4 +1,4 @@
import std/unittest
import pkg/unittest2
import pkg/questionable
import pkg/codex/contracts/requests
import pkg/codex/sales/states/filling
@ -7,7 +7,7 @@ import pkg/codex/sales/states/failed
import ../../examples
import ../../helpers
checksuite "sales state 'filling'":
suite "sales state 'filling'":
let request = StorageRequest.example
let slotIndex = request.ask.slots div 2
var state: SaleFilling

View File

@ -14,7 +14,7 @@ import ../../helpers/mockmarket
import ../../examples
import ../../helpers
checksuite "sales state 'unknown'":
suite "sales state 'unknown'":
let request = StorageRequest.example
let slotIndex = request.ask.slots div 2
let slotId = slotId(request.id, slotIndex)

View File

@ -236,10 +236,17 @@ asyncchecksuite "Sales":
return true
proc addRequestToSaturatedQueue(): Future[StorageRequest] {.async.} =
queue.onProcessSlot = proc(item: SlotQueueItem, done: Future[void]) {.async.} =
await sleepAsync(10.millis)
itemsProcessed.add item
done.complete()
queue.onProcessSlot = proc(
item: SlotQueueItem, done: Future[void]
) {.async: (raises: []).} =
try:
await sleepAsync(10.millis)
itemsProcessed.add item
except CancelledError as exc:
checkpoint(exc.msg)
finally:
if not done.finished:
done.complete()
var request1 = StorageRequest.example
request1.ask.collateralPerByte = request.ask.collateralPerByte + 1
@ -261,9 +268,12 @@ asyncchecksuite "Sales":
waitFor run()
test "processes all request's slots once StorageRequested emitted":
queue.onProcessSlot = proc(item: SlotQueueItem, done: Future[void]) {.async.} =
queue.onProcessSlot = proc(
item: SlotQueueItem, done: Future[void]
) {.async: (raises: []).} =
itemsProcessed.add item
done.complete()
if not done.finished:
done.complete()
createAvailability()
await market.requestStorage(request)
let items = SlotQueueItem.init(request, collateral = request.ask.collateralPerSlot)
@ -299,9 +309,12 @@ asyncchecksuite "Sales":
check always (not itemsProcessed.contains(expected))
test "adds slot index to slot queue once SlotFreed emitted":
queue.onProcessSlot = proc(item: SlotQueueItem, done: Future[void]) {.async.} =
queue.onProcessSlot = proc(
item: SlotQueueItem, done: Future[void]
) {.async: (raises: []).} =
itemsProcessed.add item
done.complete()
if not done.finished:
done.complete()
createAvailability()
market.requested.add request # "contract" must be able to return request

View File

@ -50,12 +50,19 @@ suite "Slot queue start/stop":
suite "Slot queue workers":
var queue: SlotQueue
proc onProcessSlot(item: SlotQueueItem, doneProcessing: Future[void]) {.async.} =
await sleepAsync(1000.millis)
proc onProcessSlot(
item: SlotQueueItem, doneProcessing: Future[void]
) {.async: (raises: []).} =
# this is not illustrative of the realistic scenario as the
# `doneProcessing` future would be passed to another context before being
# completed and therefore is not as simple as making the callback async
doneProcessing.complete()
try:
await sleepAsync(1000.millis)
except CatchableError as exc:
checkpoint(exc.msg)
finally:
if not doneProcessing.finished:
doneProcessing.complete()
setup:
let request = StorageRequest.example
@ -89,9 +96,14 @@ suite "Slot queue workers":
check eventually queue.activeWorkers == 3
test "discards workers once processing completed":
proc processSlot(item: SlotQueueItem, done: Future[void]) {.async.} =
await sleepAsync(1.millis)
done.complete()
proc processSlot(item: SlotQueueItem, done: Future[void]) {.async: (raises: []).} =
try:
await sleepAsync(1.millis)
except CatchableError as exc:
checkpoint(exc.msg)
finally:
if not done.finished:
done.complete()
queue.onProcessSlot = processSlot
@ -114,11 +126,19 @@ suite "Slot queue":
proc newSlotQueue(maxSize, maxWorkers: int, processSlotDelay = 1.millis) =
queue = SlotQueue.new(maxWorkers, maxSize.uint16)
queue.onProcessSlot = proc(item: SlotQueueItem, done: Future[void]) {.async.} =
await sleepAsync(processSlotDelay)
onProcessSlotCalled = true
onProcessSlotCalledWith.add (item.requestId, item.slotIndex)
done.complete()
queue.onProcessSlot = proc(
item: SlotQueueItem, done: Future[void]
) {.async: (raises: []).} =
try:
await sleepAsync(processSlotDelay)
except CatchableError as exc:
checkpoint(exc.msg)
finally:
onProcessSlotCalled = true
onProcessSlotCalledWith.add (item.requestId, item.slotIndex)
if not done.finished:
done.complete()
queue.start()
setup:

View File

@ -133,7 +133,7 @@ suite "Slot builder":
check:
Poseidon2Builder.new(localStore, mismatchManifest, cellSize = cellSize).error.msg ==
"Number of blocks must be divisable by number of slots."
"Number of blocks must be divisible by number of slots."
test "Block size must be divisable by cell size":
let mismatchManifest = Manifest.new(
@ -151,7 +151,7 @@ suite "Slot builder":
check:
Poseidon2Builder.new(localStore, mismatchManifest, cellSize = cellSize).error.msg ==
"Block size must be divisable by cell size."
"Block size must be divisible by cell size."
test "Should build correct slot builder":
builder =

View File

@ -1,6 +1,6 @@
import std/unittest
import std/random
import pkg/unittest2
import pkg/stew/objects
import pkg/questionable
import pkg/questionable/results
@ -11,7 +11,7 @@ import pkg/codex/stores/repostore/coders
import ../../helpers
checksuite "Test coders":
suite "Test coders":
proc rand(T: type NBytes): T =
rand(Natural).NBytes

View File

@ -11,7 +11,7 @@ import ./commonstoretests
import ../../asynctest
import ../helpers
checksuite "Cache Store":
suite "Cache Store":
var
newBlock, newBlock1, newBlock2, newBlock3: Block
store: CacheStore

View File

@ -36,7 +36,7 @@ proc createManifestCid(): ?!Cid =
let cid = ?Cid.init(version, codec, hash).mapFailure
return success cid
checksuite "KeyUtils":
suite "KeyUtils":
test "makePrefixKey should create block key":
let length = 6
let cid = Cid.example

View File

@ -21,7 +21,7 @@ import ../examples
import codex/stores/maintenance
checksuite "BlockMaintainer":
suite "BlockMaintainer":
var mockRepoStore: MockRepoStore
var interval: Duration
var mockTimer: MockTimer

View File

@ -24,7 +24,7 @@ import ../helpers/mockclock
import ../examples
import ./commonstoretests
checksuite "Test RepoStore start/stop":
suite "Test RepoStore start/stop":
var
repoDs: Datastore
metaDs: Datastore

View File

@ -22,7 +22,7 @@ proc toSortedSeq[T](h: AsyncHeapQueue[T], queueType = QueueType.Min): seq[T] =
while tmp.len > 0:
result.add(popNoWait(tmp).tryGet())
checksuite "Synchronous tests":
suite "Synchronous tests":
test "Test pushNoWait - Min":
var heap = newAsyncHeapQueue[int]()
let data = [1, 3, 5, 7, 9, 2, 4, 6, 8, 0]

View File

@ -27,7 +27,7 @@ asyncchecksuite "Chunking":
let contents = [1.byte, 2, 3, 4, 5, 6, 7, 8, 9, 0]
proc reader(
data: ChunkBuffer, len: int
): Future[int] {.gcsafe, async, raises: [Defect].} =
): Future[int] {.gcsafe, async: (raises: [ChunkerError, CancelledError]).} =
let read = min(contents.len - offset, len)
if read == 0:
return 0
@ -97,8 +97,13 @@ asyncchecksuite "Chunking":
discard (await chunker.getBytes())
test "stream should forward LPStreamError":
expect LPStreamError:
try:
await raiseStreamException(newException(LPStreamError, "test error"))
except ChunkerError as exc:
check exc.parent of LPStreamError
except CatchableError as exc:
checkpoint("Unexpected error: " & exc.msg)
fail()
test "stream should catch LPStreamEOFError":
await raiseStreamException(newException(LPStreamEOFError, "test error"))
@ -106,7 +111,3 @@ asyncchecksuite "Chunking":
test "stream should forward CancelledError":
expect CancelledError:
await raiseStreamException(newException(CancelledError, "test error"))
test "stream should forward LPStreamError":
expect LPStreamError:
await raiseStreamException(newException(LPStreamError, "test error"))

View File

@ -1,9 +1,9 @@
import std/unittest
import pkg/unittest2
import codex/clock
import ./helpers
checksuite "Clock":
suite "Clock":
proc testConversion(seconds: SecondsSince1970) =
let asBytes = seconds.toBytes

View File

@ -1,6 +1,7 @@
import std/options
import std/strutils
import std/unittest
import pkg/unittest2
import pkg/codex/blocktype
import pkg/codex/conf
import pkg/codex/contracts/requests

View File

@ -13,7 +13,7 @@ import ../asynctest
import ./helpers
import ./examples
checksuite "Manifest":
suite "Manifest":
let
manifest =
Manifest.new(treeCid = Cid.example, blockSize = 1.MiBs, datasetSize = 100.MiBs)

View File

@ -116,7 +116,7 @@ asyncchecksuite "Purchasing":
await purchase.wait()
check market.withdrawn == @[request.id]
checksuite "Purchasing state machine":
suite "Purchasing state machine":
var purchasing: Purchasing
var market: MockMarket
var clock: MockClock

View File

@ -1,10 +1,10 @@
import std/times
import std/unittest
import codex/systemclock
import pkg/unittest2
import pkg/codex/systemclock
import ./helpers
checksuite "SystemClock":
suite "SystemClock":
test "Should get now":
let clock = SystemClock.new()

View File

@ -7,7 +7,7 @@ import pkg/codex/utils/iter
import ../../asynctest
import ../helpers
checksuite "Test Iter":
suite "Test Iter":
test "Should be finished":
let iter = Iter[int].empty()

View File

@ -1,12 +1,14 @@
import std/unittest
import std/os
import codex/utils/keyutils
import pkg/unittest2
import pkg/codex/utils/keyutils
import ../helpers
when defined(windows):
import stew/windows/acl
checksuite "keyutils":
suite "keyutils":
let path = getTempDir() / "CodexTest"
setup:

View File

@ -1,8 +1,9 @@
import std/unittest
import codex/utils/options
import pkg/unittest2
import pkg/codex/utils/options
import ../helpers
checksuite "optional casts":
suite "optional casts":
test "casting value to same type works":
check 42 as int == some 42
@ -31,7 +32,7 @@ checksuite "optional casts":
check 42.some as string == string.none
check int.none as int == int.none
checksuite "Optionalize":
suite "Optionalize":
test "does not except non-object types":
static:
doAssert not compiles(Optionalize(int))

View File

@ -17,47 +17,71 @@ asyncchecksuite "tracked futures":
check module.trackedFutures.len == 0
test "tracks unfinished futures":
let fut = newFuture[void]("test")
let fut = Future[void].Raising([]).init("test", {FutureFlag.OwnCancelSchedule})
module.trackedFutures.track(fut)
check module.trackedFutures.len == 1
test "does not track completed futures":
let fut = newFuture[void]("test")
let fut = Future[void].Raising([]).init("test", {FutureFlag.OwnCancelSchedule})
fut.complete()
module.trackedFutures.track(fut)
check eventually module.trackedFutures.len == 0
test "does not track failed futures":
let fut = newFuture[void]("test")
fut.fail((ref CatchableError)(msg: "some error"))
module.trackedFutures.track(fut)
check eventually module.trackedFutures.len == 0
check module.trackedFutures.len == 0
test "does not track cancelled futures":
let fut = newFuture[void]("test")
let fut = Future[void].Raising([]).init("test", {FutureFlag.OwnCancelSchedule})
fut.cancelCallback = proc(data: pointer) =
fut.cancelAndSchedule() # manually schedule the cancel
await fut.cancelAndWait()
module.trackedFutures.track(fut)
check eventually module.trackedFutures.len == 0
test "removes tracked future when finished":
let fut = newFuture[void]("test")
let fut = Future[void].Raising([]).init("test", {FutureFlag.OwnCancelSchedule})
module.trackedFutures.track(fut)
check module.trackedFutures.len == 1
fut.complete()
check eventually module.trackedFutures.len == 0
test "removes tracked future when cancelled":
let fut = newFuture[void]("test")
let fut = Future[void].Raising([]).init("test", {FutureFlag.OwnCancelSchedule})
fut.cancelCallback = proc(data: pointer) =
fut.cancelAndSchedule() # manually schedule the cancel
module.trackedFutures.track(fut)
check module.trackedFutures.len == 1
await fut.cancelAndWait()
check eventually module.trackedFutures.len == 0
test "completed and removes future on cancel":
let fut = Future[void].Raising([]).init("test", {FutureFlag.OwnCancelSchedule})
fut.cancelCallback = proc(data: pointer) =
fut.complete()
module.trackedFutures.track(fut)
check module.trackedFutures.len == 1
await fut.cancelAndWait()
check eventually module.trackedFutures.len == 0
test "cancels and removes all tracked futures":
let fut1 = newFuture[void]("test1")
let fut2 = newFuture[void]("test2")
let fut3 = newFuture[void]("test3")
let fut1 = Future[void].Raising([]).init("test1", {FutureFlag.OwnCancelSchedule})
fut1.cancelCallback = proc(data: pointer) =
fut1.cancelAndSchedule() # manually schedule the cancel
let fut2 = Future[void].Raising([]).init("test2", {FutureFlag.OwnCancelSchedule})
fut2.cancelCallback = proc(data: pointer) =
fut2.cancelAndSchedule() # manually schedule the cancel
let fut3 = Future[void].Raising([]).init("test3", {FutureFlag.OwnCancelSchedule})
fut3.cancelCallback = proc(data: pointer) =
fut3.cancelAndSchedule() # manually schedule the cancel
module.trackedFutures.track(fut1)
check module.trackedFutures.len == 1
module.trackedFutures.track(fut2)
check module.trackedFutures.len == 2
module.trackedFutures.track(fut3)
check module.trackedFutures.len == 3
await module.trackedFutures.cancelTracked()
check eventually fut1.cancelled
check eventually fut2.cancelled

View File

@ -1,4 +1,4 @@
import std/unittest
import pkg/unittest2
import pkg/codex/utils

View File

@ -2,4 +2,36 @@ import helpers/multisetup
import helpers/trackers
import helpers/templeveldb
import std/sequtils, chronos
export multisetup, trackers, templeveldb
### taken from libp2p errorhelpers.nim
proc allFuturesThrowing*(args: varargs[FutureBase]): Future[void] =
# This proc is only meant for use in tests / not suitable for general use.
# - Swallowing errors arbitrarily instead of aggregating them is bad design
# - It raises `CatchableError` instead of the union of the `futs` errors,
# inflating the caller's `raises` list unnecessarily. `macro` could fix it
let futs = @args
(
proc() {.async: (raises: [CatchableError]).} =
await allFutures(futs)
var firstErr: ref CatchableError
for fut in futs:
if fut.failed:
let err = fut.error()
if err of CancelledError:
raise err
if firstErr == nil:
firstErr = err
if firstErr != nil:
raise firstErr
)()
proc allFuturesThrowing*[T](futs: varargs[Future[T]]): Future[void] =
allFuturesThrowing(futs.mapIt(FutureBase(it)))
proc allFuturesThrowing*[T, E]( # https://github.com/nim-lang/Nim/issues/23432
futs: varargs[InternalRaisesFuture[T, E]]
): Future[void] =
allFuturesThrowing(futs.mapIt(FutureBase(it)))

View File

@ -1,5 +1,5 @@
import pkg/codex/streams/storestream
import std/unittest
import pkg/unittest2
# From lip2p/tests/helpers
const trackerNames = [StoreStreamTrackerName]

2
vendor/nim-serde vendored

@ -1 +1 @@
Subproject commit c82e85c62436218592fbe876df5ac389ef8b964b
Subproject commit 5ced7c88b97d99c582285ce796957fb71fd42434