From a609baea261b65673a1c7cf899710bbf6bd32924 Mon Sep 17 00:00:00 2001 From: Dmitriy Ryajov Date: Mon, 24 Feb 2025 15:01:23 -0600 Subject: [PATCH 1/7] Add basic retry functionality (#1119) * adding basic retry functionality * avoid duplicate requests and batch them * fix cancelling blocks * properly resolve blocks * minor cleanup - use `self` * avoid useless asyncSpawn * track retries * limit max inflight and set libp2p maxIncomingStreams * cleanup * add basic yield in readLoop * use tuple instead of object * cleanup imports and logs * increase defaults * wip * fix prefetch batching * cleanup * decrease timeouts to speedup tests * remove outdated test * add retry tests * should track retries * remove useless test * use correct block address (index was off by 1) * remove duplicate noop proc * add BlockHandle type * Use BlockHandle type * add fetchLocal to control batching from local store * add format target * revert deps * adjust quotaMaxBytes * cleanup imports and logs * revert deps * cleanup blocks on cancelled * terminate erasure and prefetch jobs on stream end * split storing and retrieving data into separate tests * track `b.discoveryLoop` future * misc * remove useless check --- .gitignore | 2 + Makefile | 5 + codex/blockexchange/engine/discovery.nim | 1 - codex/blockexchange/engine/engine.nim | 378 ++++++++++-------- codex/blockexchange/engine/pendingblocks.nim | 146 ++++--- codex/blockexchange/network/network.nim | 63 ++- codex/blockexchange/network/networkpeer.nim | 29 +- codex/blockexchange/peers/peerctxstore.nim | 10 +- codex/codex.nim | 2 +- codex/conf.nim | 12 +- codex/logutils.nim | 2 +- codex/node.nim | 89 +++-- codex/rest/api.nim | 6 +- codex/rng.nim | 9 + codex/slots/builder/builder.nim | 2 +- codex/stores/maintenance.nim | 4 +- codex/stores/networkstore.nim | 8 + codex/stores/repostore/types.nim | 4 +- codex/utils/asyncspawn.nim | 10 - codex/utils/natutils.nim | 3 +- .../discovery/testdiscoveryengine.nim | 4 +- .../blockexchange/engine/testblockexc.nim | 46 +-- .../codex/blockexchange/engine/testengine.nim | 178 ++++++++- .../codex/blockexchange/testpendingblocks.nim | 35 +- tests/codex/node/helpers.nim | 2 +- tests/codex/node/testnode.nim | 49 ++- tests/integration/testrestapi.nim | 2 +- vendor/nim-serde | 2 +- 28 files changed, 697 insertions(+), 406 deletions(-) delete mode 100644 codex/utils/asyncspawn.nim diff --git a/.gitignore b/.gitignore index 0e1f27db..f6292dda 100644 --- a/.gitignore +++ b/.gitignore @@ -45,3 +45,5 @@ docker/prometheus-data .DS_Store nim.cfg tests/integration/logs + +data/ diff --git a/Makefile b/Makefile index 3dfe8e7e..29d6c11d 100644 --- a/Makefile +++ b/Makefile @@ -229,6 +229,11 @@ nph/%: build-nph echo -e $(FORMAT_MSG) "nph/$*" && \ $(NPH) $* +format: + $(NPH) *.nim + $(NPH) codex/ + $(NPH) tests/ + clean-nph: rm -f $(NPH) diff --git a/codex/blockexchange/engine/discovery.nim b/codex/blockexchange/engine/discovery.nim index ba773ac5..c664f212 100644 --- a/codex/blockexchange/engine/discovery.nim +++ b/codex/blockexchange/engine/discovery.nim @@ -144,7 +144,6 @@ proc start*(b: DiscoveryEngine) {.async.} = b.discoveryLoop = b.discoveryQueueLoop() b.trackedFutures.track(b.discoveryLoop) - asyncSpawn b.discoveryLoop proc stop*(b: DiscoveryEngine) {.async.} = ## Stop the discovery engine diff --git a/codex/blockexchange/engine/engine.nim b/codex/blockexchange/engine/engine.nim index d30f88d9..dafdd520 100644 --- a/codex/blockexchange/engine/engine.nim +++ b/codex/blockexchange/engine/engine.nim @@ -19,6 +19,7 @@ import pkg/metrics import pkg/stint import pkg/questionable +import ../../rng import ../../stores/blockstore import ../../blocktype import ../../utils @@ -67,12 +68,6 @@ const DefaultMaxPeersPerRequest* = 10 DefaultTaskQueueSize = 100 DefaultConcurrentTasks = 10 - # DefaultMaxRetries = 3 - # DefaultConcurrentDiscRequests = 10 - # DefaultConcurrentAdvertRequests = 10 - # DefaultDiscoveryTimeout = 1.minutes - # DefaultMaxQueriedBlocksCache = 1000 - # DefaultMinPeersPerBlock = 3 type TaskHandler* = proc(task: BlockExcPeerCtx): Future[void] {.gcsafe.} @@ -88,10 +83,8 @@ type trackedFutures: TrackedFutures # Tracks futures of blockexc tasks blockexcRunning: bool # Indicates if the blockexc task is running pendingBlocks*: PendingBlocksManager # Blocks we're awaiting to be resolved - peersPerRequest: int # Max number of peers to request from wallet*: WalletRef # Nitro wallet for micropayments pricing*: ?Pricing # Optional bandwidth pricing - blockFetchTimeout*: Duration # Timeout for fetching blocks over the network discovery*: DiscoveryEngine advertiser*: Advertiser @@ -100,124 +93,147 @@ type price*: UInt256 # attach task scheduler to engine -proc scheduleTask(b: BlockExcEngine, task: BlockExcPeerCtx): bool {.gcsafe.} = - b.taskQueue.pushOrUpdateNoWait(task).isOk() +proc scheduleTask(self: BlockExcEngine, task: BlockExcPeerCtx): bool {.gcsafe.} = + self.taskQueue.pushOrUpdateNoWait(task).isOk() -proc blockexcTaskRunner(b: BlockExcEngine) {.async: (raises: []).} +proc blockexcTaskRunner(self: BlockExcEngine) {.async: (raises: []).} -proc start*(b: BlockExcEngine) {.async.} = +proc start*(self: BlockExcEngine) {.async.} = ## Start the blockexc task ## - await b.discovery.start() - await b.advertiser.start() + await self.discovery.start() + await self.advertiser.start() - trace "Blockexc starting with concurrent tasks", tasks = b.concurrentTasks - if b.blockexcRunning: + trace "Blockexc starting with concurrent tasks", tasks = self.concurrentTasks + if self.blockexcRunning: warn "Starting blockexc twice" return - b.blockexcRunning = true - for i in 0 ..< b.concurrentTasks: - let fut = b.blockexcTaskRunner() - b.trackedFutures.track(fut) - asyncSpawn fut + self.blockexcRunning = true + for i in 0 ..< self.concurrentTasks: + let fut = self.blockexcTaskRunner() + self.trackedFutures.track(fut) -proc stop*(b: BlockExcEngine) {.async.} = +proc stop*(self: BlockExcEngine) {.async.} = ## Stop the blockexc blockexc ## - await b.discovery.stop() - await b.advertiser.stop() + await self.trackedFutures.cancelTracked() + await self.network.stop() + await self.discovery.stop() + await self.advertiser.stop() trace "NetworkStore stop" - if not b.blockexcRunning: + if not self.blockexcRunning: warn "Stopping blockexc without starting it" return - b.blockexcRunning = false - await b.trackedFutures.cancelTracked() + self.blockexcRunning = false trace "NetworkStore stopped" proc sendWantHave( - b: BlockExcEngine, addresses: seq[BlockAddress], peers: seq[BlockExcPeerCtx] + self: BlockExcEngine, addresses: seq[BlockAddress], peers: seq[BlockExcPeerCtx] ): Future[void] {.async.} = for p in peers: let toAsk = addresses.filterIt(it notin p.peerHave) trace "Sending wantHave request", toAsk, peer = p.id - await b.network.request.sendWantList(p.id, toAsk, wantType = WantType.WantHave) + await self.network.request.sendWantList(p.id, toAsk, wantType = WantType.WantHave) codex_block_exchange_want_have_lists_sent.inc() proc sendWantBlock( - b: BlockExcEngine, addresses: seq[BlockAddress], blockPeer: BlockExcPeerCtx + self: BlockExcEngine, addresses: seq[BlockAddress], blockPeer: BlockExcPeerCtx ): Future[void] {.async.} = trace "Sending wantBlock request to", addresses, peer = blockPeer.id - await b.network.request.sendWantList( + await self.network.request.sendWantList( blockPeer.id, addresses, wantType = WantType.WantBlock ) # we want this remote to send us a block codex_block_exchange_want_block_lists_sent.inc() -proc monitorBlockHandle( - b: BlockExcEngine, handle: Future[Block], address: BlockAddress, peerId: PeerId -) {.async.} = +proc randomPeer(peers: seq[BlockExcPeerCtx]): BlockExcPeerCtx = + Rng.instance.sample(peers) + +proc downloadInternal( + self: BlockExcEngine, address: BlockAddress +) {.async: (raises: []).} = + logScope: + address = address + + let handle = self.pendingBlocks.getWantHandle(address) + trace "Downloading block" try: - discard await handle + while address in self.pendingBlocks: + logScope: + retries = self.pendingBlocks.retries(address) + interval = self.pendingBlocks.retryInterval + + if self.pendingBlocks.retriesExhausted(address): + trace "Error retries exhausted" + handle.fail(newException(RetriesExhaustedError, "Error retries exhausted")) + break + + trace "Running retry handle" + let peers = self.peers.getPeersForBlock(address) + logScope: + peersWith = peers.with.len + peersWithout = peers.without.len + + trace "Peers for block" + if peers.with.len > 0: + self.pendingBlocks.setInFlight(address, true) + await self.sendWantBlock(@[address], peers.with.randomPeer) + else: + self.pendingBlocks.setInFlight(address, false) + if peers.without.len > 0: + await self.sendWantHave(@[address], peers.without) + self.discovery.queueFindBlocksReq(@[address.cidOrTreeCid]) + + await (handle or sleepAsync(self.pendingBlocks.retryInterval)) + self.pendingBlocks.decRetries(address) + + if handle.finished: + trace "Handle for block finished", failed = handle.failed + break except CancelledError as exc: - trace "Block handle cancelled", address, peerId + trace "Block download cancelled" + if not handle.finished: + await handle.cancelAndWait() except CatchableError as exc: - warn "Error block handle, disconnecting peer", address, exc = exc.msg, peerId - - # TODO: really, this is just a quick and dirty way of - # preventing hitting the same "bad" peer every time, however, - # we might as well discover this on or next iteration, so - # it doesn't mean that we're never talking to this peer again. - # TODO: we need a lot more work around peer selection and - # prioritization - - # drop unresponsive peer - await b.network.switch.disconnect(peerId) - b.discovery.queueFindBlocksReq(@[address.cidOrTreeCid]) - -proc pickPseudoRandom( - address: BlockAddress, peers: seq[BlockExcPeerCtx] -): BlockExcPeerCtx = - return peers[hash(address) mod peers.len] + warn "Error downloadloading block", exc = exc.msg + if not handle.finished: + handle.fail(exc) + finally: + self.pendingBlocks.setInFlight(address, false) proc requestBlock*( - b: BlockExcEngine, address: BlockAddress -): Future[?!Block] {.async.} = - let blockFuture = b.pendingBlocks.getWantHandle(address, b.blockFetchTimeout) + self: BlockExcEngine, address: BlockAddress +): Future[?!Block] {.async: (raises: [CancelledError]).} = + if address notin self.pendingBlocks: + self.trackedFutures.track(self.downloadInternal(address)) - if not b.pendingBlocks.isInFlight(address): - let peers = b.peers.getPeersForBlock(address) - - if peers.with.len == 0: - b.discovery.queueFindBlocksReq(@[address.cidOrTreeCid]) - else: - let selected = pickPseudoRandom(address, peers.with) - asyncSpawn b.monitorBlockHandle(blockFuture, address, selected.id) - b.pendingBlocks.setInFlight(address) - await b.sendWantBlock(@[address], selected) - - await b.sendWantHave(@[address], peers.without) - - # Don't let timeouts bubble up. We can't be too broad here or we break - # cancellations. try: - success await blockFuture - except AsyncTimeoutError as err: + let handle = self.pendingBlocks.getWantHandle(address) + success await handle + except CancelledError as err: + warn "Block request cancelled", address + raise err + except CatchableError as err: + error "Block request failed", address, err = err.msg failure err -proc requestBlock*(b: BlockExcEngine, cid: Cid): Future[?!Block] = - b.requestBlock(BlockAddress.init(cid)) +proc requestBlock*( + self: BlockExcEngine, cid: Cid +): Future[?!Block] {.async: (raw: true, raises: [CancelledError]).} = + self.requestBlock(BlockAddress.init(cid)) proc blockPresenceHandler*( - b: BlockExcEngine, peer: PeerId, blocks: seq[BlockPresence] + self: BlockExcEngine, peer: PeerId, blocks: seq[BlockPresence] ) {.async.} = + trace "Received block presence from peer", peer, blocks = blocks.mapIt($it) let - peerCtx = b.peers.get(peer) - wantList = toSeq(b.pendingBlocks.wantList) + peerCtx = self.peers.get(peer) + ourWantList = toSeq(self.pendingBlocks.wantList) if peerCtx.isNil: return @@ -228,82 +244,99 @@ proc blockPresenceHandler*( let peerHave = peerCtx.peerHave - dontWantCids = peerHave.filterIt(it notin wantList) + dontWantCids = peerHave.filterIt(it notin ourWantList) if dontWantCids.len > 0: peerCtx.cleanPresence(dontWantCids) - let wantCids = wantList.filterIt(it in peerHave) + let ourWantCids = ourWantList.filter do(address: BlockAddress) -> bool: + if address in peerHave and not self.pendingBlocks.retriesExhausted(address) and + not self.pendingBlocks.isInFlight(address): + self.pendingBlocks.setInFlight(address, true) + self.pendingBlocks.decRetries(address) + true + else: + false - if wantCids.len > 0: - trace "Peer has blocks in our wantList", peer, wants = wantCids - await b.sendWantBlock(wantCids, peerCtx) + if ourWantCids.len > 0: + trace "Peer has blocks in our wantList", peer, wants = ourWantCids + await self.sendWantBlock(ourWantCids, peerCtx) - # if none of the connected peers report our wants in their have list, - # fire up discovery - b.discovery.queueFindBlocksReq( - toSeq(b.pendingBlocks.wantListCids).filter do(cid: Cid) -> bool: - not b.peers.anyIt(cid in it.peerHaveCids) - ) - -proc scheduleTasks(b: BlockExcEngine, blocksDelivery: seq[BlockDelivery]) {.async.} = +proc scheduleTasks(self: BlockExcEngine, blocksDelivery: seq[BlockDelivery]) {.async.} = let cids = blocksDelivery.mapIt(it.blk.cid) # schedule any new peers to provide blocks to - for p in b.peers: + for p in self.peers: for c in cids: # for each cid # schedule a peer if it wants at least one cid # and we have it in our local store if c in p.peerWantsCids: - if await (c in b.localStore): - if b.scheduleTask(p): + if await (c in self.localStore): + if self.scheduleTask(p): trace "Task scheduled for peer", peer = p.id else: warn "Unable to schedule task for peer", peer = p.id break # do next peer -proc cancelBlocks(b: BlockExcEngine, addrs: seq[BlockAddress]) {.async.} = +proc cancelBlocks(self: BlockExcEngine, addrs: seq[BlockAddress]) {.async.} = ## Tells neighboring peers that we're no longer interested in a block. - trace "Sending block request cancellations to peers", - addrs, peers = b.peers.mapIt($it.id) + ## - let failed = ( - await allFinished( - b.peers.mapIt( - b.network.request.sendWantCancellations(peer = it.id, addresses = addrs) + if self.peers.len == 0: + return + + trace "Sending block request cancellations to peers", + addrs, peers = self.peers.peerIds + + proc mapPeers(peerCtx: BlockExcPeerCtx): Future[BlockExcPeerCtx] {.async.} = + let blocks = addrs.filter do(a: BlockAddress) -> bool: + a in peerCtx.blocks + + if blocks.len > 0: + trace "Sending block request cancellations to peer", peer = peerCtx.id, blocks + await self.network.request.sendWantCancellations( + peer = peerCtx.id, addresses = blocks ) - ) - ).filterIt(it.failed) + peerCtx.cleanPresence(addrs) + peerCtx + + let failed = (await allFinished(map(toSeq(self.peers.peers.values), mapPeers))).filterIt( + it.failed + ) if failed.len > 0: warn "Failed to send block request cancellations to peers", peers = failed.len + else: + trace "Block request cancellations sent to peers", peers = self.peers.len -proc resolveBlocks*(b: BlockExcEngine, blocksDelivery: seq[BlockDelivery]) {.async.} = - b.pendingBlocks.resolve(blocksDelivery) - await b.scheduleTasks(blocksDelivery) - await b.cancelBlocks(blocksDelivery.mapIt(it.address)) +proc resolveBlocks*( + self: BlockExcEngine, blocksDelivery: seq[BlockDelivery] +) {.async.} = + self.pendingBlocks.resolve(blocksDelivery) + await self.scheduleTasks(blocksDelivery) + await self.cancelBlocks(blocksDelivery.mapIt(it.address)) -proc resolveBlocks*(b: BlockExcEngine, blocks: seq[Block]) {.async.} = - await b.resolveBlocks( +proc resolveBlocks*(self: BlockExcEngine, blocks: seq[Block]) {.async.} = + await self.resolveBlocks( blocks.mapIt( BlockDelivery(blk: it, address: BlockAddress(leaf: false, cid: it.cid)) ) ) proc payForBlocks( - engine: BlockExcEngine, peer: BlockExcPeerCtx, blocksDelivery: seq[BlockDelivery] + self: BlockExcEngine, peer: BlockExcPeerCtx, blocksDelivery: seq[BlockDelivery] ) {.async.} = let - sendPayment = engine.network.request.sendPayment + sendPayment = self.network.request.sendPayment price = peer.price(blocksDelivery.mapIt(it.address)) - if payment =? engine.wallet.pay(peer, price): + if payment =? self.wallet.pay(peer, price): trace "Sending payment for blocks", price, len = blocksDelivery.len await sendPayment(peer.id, payment) -proc validateBlockDelivery(b: BlockExcEngine, bd: BlockDelivery): ?!void = - if bd.address notin b.pendingBlocks: +proc validateBlockDelivery(self: BlockExcEngine, bd: BlockDelivery): ?!void = + if bd.address notin self.pendingBlocks: return failure("Received block is not currently a pending block") if bd.address.leaf: @@ -333,7 +366,7 @@ proc validateBlockDelivery(b: BlockExcEngine, bd: BlockDelivery): ?!void = return success() proc blocksDeliveryHandler*( - b: BlockExcEngine, peer: PeerId, blocksDelivery: seq[BlockDelivery] + self: BlockExcEngine, peer: PeerId, blocksDelivery: seq[BlockDelivery] ) {.async.} = trace "Received blocks from peer", peer, blocks = (blocksDelivery.mapIt(it.address)) @@ -343,11 +376,11 @@ proc blocksDeliveryHandler*( peer = peer address = bd.address - if err =? b.validateBlockDelivery(bd).errorOption: + if err =? self.validateBlockDelivery(bd).errorOption: warn "Block validation failed", msg = err.msg continue - if err =? (await b.localStore.putBlock(bd.blk)).errorOption: + if err =? (await self.localStore.putBlock(bd.blk)).errorOption: error "Unable to store block", err = err.msg continue @@ -356,7 +389,7 @@ proc blocksDeliveryHandler*( error "Proof expected for a leaf block delivery" continue if err =? ( - await b.localStore.putCidAndProof( + await self.localStore.putCidAndProof( bd.address.treeCid, bd.address.index, bd.blk.cid, proof ) ).errorOption: @@ -365,18 +398,22 @@ proc blocksDeliveryHandler*( validatedBlocksDelivery.add(bd) - await b.resolveBlocks(validatedBlocksDelivery) + await self.resolveBlocks(validatedBlocksDelivery) codex_block_exchange_blocks_received.inc(validatedBlocksDelivery.len.int64) - let peerCtx = b.peers.get(peer) + let peerCtx = self.peers.get(peer) if peerCtx != nil: - await b.payForBlocks(peerCtx, blocksDelivery) + await self.payForBlocks(peerCtx, blocksDelivery) ## shouldn't we remove them from the want-list instead of this: peerCtx.cleanPresence(blocksDelivery.mapIt(it.address)) -proc wantListHandler*(b: BlockExcEngine, peer: PeerId, wantList: WantList) {.async.} = - let peerCtx = b.peers.get(peer) +proc wantListHandler*( + self: BlockExcEngine, peer: PeerId, wantList: WantList +) {.async.} = + trace "Received want list from peer", peer, wantList = wantList.entries.len + + let peerCtx = self.peers.get(peer) if peerCtx.isNil: return @@ -395,9 +432,14 @@ proc wantListHandler*(b: BlockExcEngine, peer: PeerId, wantList: WantList) {.asy if idx < 0: # Adding new entry to peer wants let - have = await e.address in b.localStore - price = @(b.pricing.get(Pricing(price: 0.u256)).price.toBytesBE) + have = await e.address in self.localStore + price = @(self.pricing.get(Pricing(price: 0.u256)).price.toBytesBE) + if e.cancel: + trace "Received cancelation for untracked block, skipping", address = e.address + continue + + trace "Processing want list entry", wantList = $e case e.wantType of WantType.WantHave: if have: @@ -413,7 +455,6 @@ proc wantListHandler*(b: BlockExcEngine, peer: PeerId, wantList: WantList) {.asy address: e.address, `type`: BlockPresenceType.DontHave, price: price ) ) - peerCtx.peerWants.add(e) codex_block_exchange_want_have_lists_received.inc() of WantType.WantBlock: @@ -425,73 +466,76 @@ proc wantListHandler*(b: BlockExcEngine, peer: PeerId, wantList: WantList) {.asy if e.cancel: trace "Canceling want for block", address = e.address peerCtx.peerWants.del(idx) + trace "Canceled block request", address = e.address, len = peerCtx.peerWants.len else: + if e.wantType == WantType.WantBlock: + schedulePeer = true # peer might want to ask for the same cid with # different want params trace "Updating want for block", address = e.address peerCtx.peerWants[idx] = e # update entry + trace "Updated block request", address = e.address, len = peerCtx.peerWants.len if presence.len > 0: trace "Sending presence to remote", items = presence.mapIt($it).join(",") - await b.network.request.sendPresence(peer, presence) + await self.network.request.sendPresence(peer, presence) - if schedulePeer: - if not b.scheduleTask(peerCtx): - warn "Unable to schedule task for peer", peer + if schedulePeer and not self.scheduleTask(peerCtx): + warn "Unable to schedule task for peer", peer -proc accountHandler*(engine: BlockExcEngine, peer: PeerId, account: Account) {.async.} = - let context = engine.peers.get(peer) +proc accountHandler*(self: BlockExcEngine, peer: PeerId, account: Account) {.async.} = + let context = self.peers.get(peer) if context.isNil: return context.account = account.some proc paymentHandler*( - engine: BlockExcEngine, peer: PeerId, payment: SignedState + self: BlockExcEngine, peer: PeerId, payment: SignedState ) {.async.} = trace "Handling payments", peer - without context =? engine.peers.get(peer).option and account =? context.account: + without context =? self.peers.get(peer).option and account =? context.account: trace "No context or account for peer", peer return if channel =? context.paymentChannel: let sender = account.address - discard engine.wallet.acceptPayment(channel, Asset, sender, payment) + discard self.wallet.acceptPayment(channel, Asset, sender, payment) else: - context.paymentChannel = engine.wallet.acceptChannel(payment).option + context.paymentChannel = self.wallet.acceptChannel(payment).option -proc setupPeer*(b: BlockExcEngine, peer: PeerId) {.async.} = +proc setupPeer*(self: BlockExcEngine, peer: PeerId) {.async.} = ## Perform initial setup, such as want ## list exchange ## trace "Setting up peer", peer - if peer notin b.peers: + if peer notin self.peers: trace "Setting up new peer", peer - b.peers.add(BlockExcPeerCtx(id: peer)) - trace "Added peer", peers = b.peers.len + self.peers.add(BlockExcPeerCtx(id: peer)) + trace "Added peer", peers = self.peers.len # broadcast our want list, the other peer will do the same - if b.pendingBlocks.wantListLen > 0: + if self.pendingBlocks.wantListLen > 0: trace "Sending our want list to a peer", peer - let cids = toSeq(b.pendingBlocks.wantList) - await b.network.request.sendWantList(peer, cids, full = true) + let cids = toSeq(self.pendingBlocks.wantList) + await self.network.request.sendWantList(peer, cids, full = true) - if address =? b.pricing .? address: - await b.network.request.sendAccount(peer, Account(address: address)) + if address =? self.pricing .? address: + await self.network.request.sendAccount(peer, Account(address: address)) -proc dropPeer*(b: BlockExcEngine, peer: PeerId) = +proc dropPeer*(self: BlockExcEngine, peer: PeerId) = ## Cleanup disconnected peer ## trace "Dropping peer", peer # drop the peer from the peers table - b.peers.remove(peer) + self.peers.remove(peer) -proc taskHandler*(b: BlockExcEngine, task: BlockExcPeerCtx) {.gcsafe, async.} = +proc taskHandler*(self: BlockExcEngine, task: BlockExcPeerCtx) {.gcsafe, async.} = # Send to the peer blocks he wants to get, # if they present in our local store @@ -514,14 +558,14 @@ proc taskHandler*(b: BlockExcEngine, task: BlockExcPeerCtx) {.gcsafe, async.} = proc localLookup(e: WantListEntry): Future[?!BlockDelivery] {.async.} = if e.address.leaf: - (await b.localStore.getBlockAndProof(e.address.treeCid, e.address.index)).map( + (await self.localStore.getBlockAndProof(e.address.treeCid, e.address.index)).map( (blkAndProof: (Block, CodexProof)) => BlockDelivery( address: e.address, blk: blkAndProof[0], proof: blkAndProof[1].some ) ) else: - (await b.localStore.getBlock(e.address)).map( + (await self.localStore.getBlock(e.address)).map( (blk: Block) => BlockDelivery(address: e.address, blk: blk, proof: CodexProof.none) ) @@ -540,22 +584,22 @@ proc taskHandler*(b: BlockExcEngine, task: BlockExcPeerCtx) {.gcsafe, async.} = if blocksDelivery.len > 0: trace "Sending blocks to peer", peer = task.id, blocks = (blocksDelivery.mapIt(it.address)) - await b.network.request.sendBlocksDelivery(task.id, blocksDelivery) + await self.network.request.sendBlocksDelivery(task.id, blocksDelivery) codex_block_exchange_blocks_sent.inc(blocksDelivery.len.int64) task.peerWants.keepItIf(it.address notin successAddresses) -proc blockexcTaskRunner(b: BlockExcEngine) {.async: (raises: []).} = +proc blockexcTaskRunner(self: BlockExcEngine) {.async: (raises: []).} = ## process tasks ## trace "Starting blockexc task runner" - while b.blockexcRunning: + while self.blockexcRunning: try: - let peerCtx = await b.taskQueue.pop() + let peerCtx = await self.taskQueue.pop() - await b.taskHandler(peerCtx) + await self.taskHandler(peerCtx) except CancelledError: break # do not propagate as blockexcTaskRunner was asyncSpawned except CatchableError as e: @@ -573,55 +617,51 @@ proc new*( peerStore: PeerCtxStore, pendingBlocks: PendingBlocksManager, concurrentTasks = DefaultConcurrentTasks, - peersPerRequest = DefaultMaxPeersPerRequest, - blockFetchTimeout = DefaultBlockTimeout, ): BlockExcEngine = ## Create new block exchange engine instance ## - let engine = BlockExcEngine( + let self = BlockExcEngine( localStore: localStore, peers: peerStore, pendingBlocks: pendingBlocks, - peersPerRequest: peersPerRequest, network: network, wallet: wallet, concurrentTasks: concurrentTasks, - trackedFutures: TrackedFutures.new(), + trackedFutures: TrackedFutures(), taskQueue: newAsyncHeapQueue[BlockExcPeerCtx](DefaultTaskQueueSize), discovery: discovery, advertiser: advertiser, - blockFetchTimeout: blockFetchTimeout, ) proc peerEventHandler(peerId: PeerId, event: PeerEvent) {.async.} = if event.kind == PeerEventKind.Joined: - await engine.setupPeer(peerId) + await self.setupPeer(peerId) else: - engine.dropPeer(peerId) + self.dropPeer(peerId) if not isNil(network.switch): network.switch.addPeerEventHandler(peerEventHandler, PeerEventKind.Joined) network.switch.addPeerEventHandler(peerEventHandler, PeerEventKind.Left) proc blockWantListHandler(peer: PeerId, wantList: WantList): Future[void] {.gcsafe.} = - engine.wantListHandler(peer, wantList) + self.wantListHandler(peer, wantList) proc blockPresenceHandler( peer: PeerId, presence: seq[BlockPresence] ): Future[void] {.gcsafe.} = - engine.blockPresenceHandler(peer, presence) + self.blockPresenceHandler(peer, presence) proc blocksDeliveryHandler( peer: PeerId, blocksDelivery: seq[BlockDelivery] ): Future[void] {.gcsafe.} = - engine.blocksDeliveryHandler(peer, blocksDelivery) + self.blocksDeliveryHandler(peer, blocksDelivery) proc accountHandler(peer: PeerId, account: Account): Future[void] {.gcsafe.} = - engine.accountHandler(peer, account) + self.accountHandler(peer, account) proc paymentHandler(peer: PeerId, payment: SignedState): Future[void] {.gcsafe.} = - engine.paymentHandler(peer, payment) + self.paymentHandler(peer, payment) network.handlers = BlockExcHandlers( onWantList: blockWantListHandler, @@ -631,4 +671,4 @@ proc new*( onPayment: paymentHandler, ) - return engine + return self diff --git a/codex/blockexchange/engine/pendingblocks.nim b/codex/blockexchange/engine/pendingblocks.nim index 3b69e2d2..f169f744 100644 --- a/codex/blockexchange/engine/pendingblocks.nim +++ b/codex/blockexchange/engine/pendingblocks.nim @@ -7,13 +7,11 @@ ## This file may not be copied, modified, or distributed except according to ## those terms. +{.push raises: [].} + import std/tables import std/monotimes - -import pkg/upraises - -push: - {.upraises: [].} +import std/strutils import pkg/chronos import pkg/libp2p @@ -34,66 +32,76 @@ declareGauge( codex_block_exchange_retrieval_time_us, "codex blockexchange block retrieval time us" ) -const DefaultBlockTimeout* = 10.minutes +const + DefaultBlockRetries* = 3000 + DefaultRetryInterval* = 500.millis type + RetriesExhaustedError* = object of CatchableError + BlockHandle* = Future[Block].Raising([CancelledError, RetriesExhaustedError]) + BlockReq* = object - handle*: Future[Block] + handle*: BlockHandle inFlight*: bool + blockRetries*: int startTime*: int64 PendingBlocksManager* = ref object of RootObj + blockRetries*: int = DefaultBlockRetries + retryInterval*: Duration = DefaultRetryInterval blocks*: Table[BlockAddress, BlockReq] # pending Block requests proc updatePendingBlockGauge(p: PendingBlocksManager) = codex_block_exchange_pending_block_requests.set(p.blocks.len.int64) proc getWantHandle*( - p: PendingBlocksManager, - address: BlockAddress, - timeout = DefaultBlockTimeout, - inFlight = false, -): Future[Block] {.async.} = + self: PendingBlocksManager, address: BlockAddress, inFlight = false +): Future[Block] {.async: (raw: true, raises: [CancelledError, RetriesExhaustedError]).} = ## Add an event for a block ## - try: - if address notin p.blocks: - p.blocks[address] = BlockReq( - handle: newFuture[Block]("pendingBlocks.getWantHandle"), - inFlight: inFlight, - startTime: getMonoTime().ticks, - ) + self.blocks.withValue(address, blk): + return blk[].handle + do: + let blk = BlockReq( + handle: newFuture[Block]("pendingBlocks.getWantHandle"), + inFlight: inFlight, + blockRetries: self.blockRetries, + startTime: getMonoTime().ticks, + ) + self.blocks[address] = blk + let handle = blk.handle - p.updatePendingBlockGauge() - return await p.blocks[address].handle.wait(timeout) - except CancelledError as exc: - trace "Blocks cancelled", exc = exc.msg, address - raise exc - except CatchableError as exc: - error "Pending WANT failed or expired", exc = exc.msg - # no need to cancel, it is already cancelled by wait() - raise exc - finally: - p.blocks.del(address) - p.updatePendingBlockGauge() + proc cleanUpBlock(data: pointer) {.raises: [].} = + self.blocks.del(address) + self.updatePendingBlockGauge() + + handle.addCallback(cleanUpBlock) + handle.cancelCallback = proc(data: pointer) {.raises: [].} = + if not handle.finished: + handle.removeCallback(cleanUpBlock) + cleanUpBlock(nil) + + self.updatePendingBlockGauge() + return handle proc getWantHandle*( - p: PendingBlocksManager, cid: Cid, timeout = DefaultBlockTimeout, inFlight = false -): Future[Block] = - p.getWantHandle(BlockAddress.init(cid), timeout, inFlight) + self: PendingBlocksManager, cid: Cid, inFlight = false +): Future[Block] {.async: (raw: true, raises: [CancelledError, RetriesExhaustedError]).} = + self.getWantHandle(BlockAddress.init(cid), inFlight) proc resolve*( - p: PendingBlocksManager, blocksDelivery: seq[BlockDelivery] + self: PendingBlocksManager, blocksDelivery: seq[BlockDelivery] ) {.gcsafe, raises: [].} = ## Resolve pending blocks ## for bd in blocksDelivery: - p.blocks.withValue(bd.address, blockReq): - if not blockReq.handle.finished: + self.blocks.withValue(bd.address, blockReq): + if not blockReq[].handle.finished: + trace "Resolving pending block", address = bd.address let - startTime = blockReq.startTime + startTime = blockReq[].startTime stopTime = getMonoTime().ticks retrievalDurationUs = (stopTime - startTime) div 1000 @@ -106,52 +114,70 @@ proc resolve*( else: trace "Block handle already finished", address = bd.address -proc setInFlight*(p: PendingBlocksManager, address: BlockAddress, inFlight = true) = +func retries*(self: PendingBlocksManager, address: BlockAddress): int = + self.blocks.withValue(address, pending): + result = pending[].blockRetries + do: + result = 0 + +func decRetries*(self: PendingBlocksManager, address: BlockAddress) = + self.blocks.withValue(address, pending): + pending[].blockRetries -= 1 + +func retriesExhausted*(self: PendingBlocksManager, address: BlockAddress): bool = + self.blocks.withValue(address, pending): + result = pending[].blockRetries <= 0 + +func setInFlight*(self: PendingBlocksManager, address: BlockAddress, inFlight = true) = ## Set inflight status for a block ## - p.blocks.withValue(address, pending): + self.blocks.withValue(address, pending): pending[].inFlight = inFlight -proc isInFlight*(p: PendingBlocksManager, address: BlockAddress): bool = +func isInFlight*(self: PendingBlocksManager, address: BlockAddress): bool = ## Check if a block is in flight ## - p.blocks.withValue(address, pending): + self.blocks.withValue(address, pending): result = pending[].inFlight -proc contains*(p: PendingBlocksManager, cid: Cid): bool = - BlockAddress.init(cid) in p.blocks +func contains*(self: PendingBlocksManager, cid: Cid): bool = + BlockAddress.init(cid) in self.blocks -proc contains*(p: PendingBlocksManager, address: BlockAddress): bool = - address in p.blocks +func contains*(self: PendingBlocksManager, address: BlockAddress): bool = + address in self.blocks -iterator wantList*(p: PendingBlocksManager): BlockAddress = - for a in p.blocks.keys: +iterator wantList*(self: PendingBlocksManager): BlockAddress = + for a in self.blocks.keys: yield a -iterator wantListBlockCids*(p: PendingBlocksManager): Cid = - for a in p.blocks.keys: +iterator wantListBlockCids*(self: PendingBlocksManager): Cid = + for a in self.blocks.keys: if not a.leaf: yield a.cid -iterator wantListCids*(p: PendingBlocksManager): Cid = +iterator wantListCids*(self: PendingBlocksManager): Cid = var yieldedCids = initHashSet[Cid]() - for a in p.blocks.keys: + for a in self.blocks.keys: let cid = a.cidOrTreeCid if cid notin yieldedCids: yieldedCids.incl(cid) yield cid -iterator wantHandles*(p: PendingBlocksManager): Future[Block] = - for v in p.blocks.values: +iterator wantHandles*(self: PendingBlocksManager): Future[Block] = + for v in self.blocks.values: yield v.handle -proc wantListLen*(p: PendingBlocksManager): int = - p.blocks.len +proc wantListLen*(self: PendingBlocksManager): int = + self.blocks.len -func len*(p: PendingBlocksManager): int = - p.blocks.len +func len*(self: PendingBlocksManager): int = + self.blocks.len -func new*(T: type PendingBlocksManager): PendingBlocksManager = - PendingBlocksManager() +func new*( + T: type PendingBlocksManager, + retries = DefaultBlockRetries, + interval = DefaultRetryInterval, +): PendingBlocksManager = + PendingBlocksManager(blockRetries: retries, retryInterval: interval) diff --git a/codex/blockexchange/network/network.nim b/codex/blockexchange/network/network.nim index ecb72890..daf358de 100644 --- a/codex/blockexchange/network/network.nim +++ b/codex/blockexchange/network/network.nim @@ -21,17 +21,18 @@ import ../../blocktype as bt import ../../logutils import ../protobuf/blockexc as pb import ../protobuf/payments +import ../../utils/trackedfutures import ./networkpeer -export network, payments +export networkpeer, payments logScope: topics = "codex blockexcnetwork" const Codec* = "/codex/blockexc/1.0.0" - MaxInflight* = 100 + DefaultMaxInflight* = 100 type WantListHandler* = proc(peer: PeerId, wantList: WantList): Future[void] {.gcsafe.} @@ -82,6 +83,8 @@ type request*: BlockExcRequest getConn: ConnProvider inflightSema: AsyncSemaphore + maxInflight: int = DefaultMaxInflight + trackedFutures*: TrackedFutures = TrackedFutures() proc peerId*(b: BlockExcNetwork): PeerId = ## Return peer id @@ -220,23 +223,25 @@ proc handlePayment( if not network.handlers.onPayment.isNil: await network.handlers.onPayment(peer.id, payment) -proc rpcHandler(b: BlockExcNetwork, peer: NetworkPeer, msg: Message) {.raises: [].} = +proc rpcHandler( + b: BlockExcNetwork, peer: NetworkPeer, msg: Message +) {.async: (raises: [CatchableError]).} = ## handle rpc messages ## if msg.wantList.entries.len > 0: - asyncSpawn b.handleWantList(peer, msg.wantList) + b.trackedFutures.track(b.handleWantList(peer, msg.wantList)) if msg.payload.len > 0: - asyncSpawn b.handleBlocksDelivery(peer, msg.payload) + b.trackedFutures.track(b.handleBlocksDelivery(peer, msg.payload)) if msg.blockPresences.len > 0: - asyncSpawn b.handleBlockPresence(peer, msg.blockPresences) + b.trackedFutures.track(b.handleBlockPresence(peer, msg.blockPresences)) if account =? Account.init(msg.account): - asyncSpawn b.handleAccount(peer, account) + b.trackedFutures.track(b.handleAccount(peer, account)) if payment =? SignedState.init(msg.payment): - asyncSpawn b.handlePayment(peer, payment) + b.trackedFutures.track(b.handlePayment(peer, payment)) proc getOrCreatePeer(b: BlockExcNetwork, peer: PeerId): NetworkPeer = ## Creates or retrieves a BlockExcNetwork Peer @@ -247,6 +252,7 @@ proc getOrCreatePeer(b: BlockExcNetwork, peer: PeerId): NetworkPeer = var getConn: ConnProvider = proc(): Future[Connection] {.async, gcsafe, closure.} = try: + trace "Getting new connection stream", peer return await b.switch.dial(peer, Codec) except CancelledError as error: raise error @@ -256,8 +262,10 @@ proc getOrCreatePeer(b: BlockExcNetwork, peer: PeerId): NetworkPeer = if not isNil(b.getConn): getConn = b.getConn - let rpcHandler = proc(p: NetworkPeer, msg: Message) {.async.} = - b.rpcHandler(p, msg) + let rpcHandler = proc( + p: NetworkPeer, msg: Message + ) {.async: (raises: [CatchableError]).} = + await b.rpcHandler(p, msg) # create new pubsub peer let blockExcPeer = NetworkPeer.new(peer, getConn, rpcHandler) @@ -282,48 +290,61 @@ proc dialPeer*(b: BlockExcNetwork, peer: PeerRecord) {.async.} = trace "Skipping dialing self", peer = peer.peerId return + if peer.peerId in b.peers: + trace "Already connected to peer", peer = peer.peerId + return + await b.switch.connect(peer.peerId, peer.addresses.mapIt(it.address)) proc dropPeer*(b: BlockExcNetwork, peer: PeerId) = ## Cleanup disconnected peer ## + trace "Dropping peer", peer b.peers.del(peer) -method init*(b: BlockExcNetwork) = +method init*(self: BlockExcNetwork) = ## Perform protocol initialization ## proc peerEventHandler(peerId: PeerId, event: PeerEvent) {.async.} = if event.kind == PeerEventKind.Joined: - b.setupPeer(peerId) + self.setupPeer(peerId) else: - b.dropPeer(peerId) + self.dropPeer(peerId) - b.switch.addPeerEventHandler(peerEventHandler, PeerEventKind.Joined) - b.switch.addPeerEventHandler(peerEventHandler, PeerEventKind.Left) + self.switch.addPeerEventHandler(peerEventHandler, PeerEventKind.Joined) + self.switch.addPeerEventHandler(peerEventHandler, PeerEventKind.Left) - proc handle(conn: Connection, proto: string) {.async, gcsafe, closure.} = + proc handler(conn: Connection, proto: string) {.async.} = let peerId = conn.peerId - let blockexcPeer = b.getOrCreatePeer(peerId) + let blockexcPeer = self.getOrCreatePeer(peerId) await blockexcPeer.readLoop(conn) # attach read loop - b.handler = handle - b.codec = Codec + self.handler = handler + self.codec = Codec + +proc stop*(self: BlockExcNetwork) {.async: (raises: []).} = + await self.trackedFutures.cancelTracked() proc new*( T: type BlockExcNetwork, switch: Switch, connProvider: ConnProvider = nil, - maxInflight = MaxInflight, + maxInflight = DefaultMaxInflight, ): BlockExcNetwork = ## Create a new BlockExcNetwork instance ## let self = BlockExcNetwork( - switch: switch, getConn: connProvider, inflightSema: newAsyncSemaphore(maxInflight) + switch: switch, + getConn: connProvider, + inflightSema: newAsyncSemaphore(maxInflight), + maxInflight: maxInflight, ) + self.maxIncomingStreams = self.maxInflight + proc sendWantList( id: PeerId, cids: seq[BlockAddress], diff --git a/codex/blockexchange/network/networkpeer.nim b/codex/blockexchange/network/networkpeer.nim index 90c538ea..4a100340 100644 --- a/codex/blockexchange/network/networkpeer.nim +++ b/codex/blockexchange/network/networkpeer.nim @@ -22,39 +22,56 @@ import ../../logutils logScope: topics = "codex blockexcnetworkpeer" +const DefaultYieldInterval = 50.millis + type ConnProvider* = proc(): Future[Connection] {.gcsafe, closure.} - RPCHandler* = proc(peer: NetworkPeer, msg: Message): Future[void] {.gcsafe.} + RPCHandler* = proc( + peer: NetworkPeer, msg: Message + ): Future[void].Raising(CatchableError) {.gcsafe.} NetworkPeer* = ref object of RootObj id*: PeerId handler*: RPCHandler sendConn: Connection getConn: ConnProvider + yieldInterval*: Duration = DefaultYieldInterval proc connected*(b: NetworkPeer): bool = not (isNil(b.sendConn)) and not (b.sendConn.closed or b.sendConn.atEof) proc readLoop*(b: NetworkPeer, conn: Connection) {.async.} = if isNil(conn): + trace "No connection to read from", peer = b.id return + trace "Attaching read loop", peer = b.id, connId = conn.oid try: + var nextYield = Moment.now() + b.yieldInterval while not conn.atEof or not conn.closed: + if Moment.now() > nextYield: + nextYield = Moment.now() + b.yieldInterval + trace "Yielding in read loop", + peer = b.id, nextYield = nextYield, interval = b.yieldInterval + await sleepAsync(10.millis) + let data = await conn.readLp(MaxMessageSize.int) msg = Message.protobufDecode(data).mapFailure().tryGet() + trace "Received message", peer = b.id, connId = conn.oid await b.handler(b, msg) except CancelledError: trace "Read loop cancelled" except CatchableError as err: warn "Exception in blockexc read loop", msg = err.msg finally: + trace "Detaching read loop", peer = b.id, connId = conn.oid await conn.close() proc connect*(b: NetworkPeer): Future[Connection] {.async.} = if b.connected: + trace "Already connected", peer = b.id, connId = b.sendConn.oid return b.sendConn b.sendConn = await b.getConn() @@ -68,17 +85,9 @@ proc send*(b: NetworkPeer, msg: Message) {.async.} = warn "Unable to get send connection for peer message not sent", peer = b.id return + trace "Sending message", peer = b.id, connId = conn.oid await conn.writeLp(protobufEncode(msg)) -proc broadcast*(b: NetworkPeer, msg: Message) = - proc sendAwaiter() {.async.} = - try: - await b.send(msg) - except CatchableError as exc: - warn "Exception broadcasting message to peer", peer = b.id, exc = exc.msg - - asyncSpawn sendAwaiter() - func new*( T: type NetworkPeer, peer: PeerId, diff --git a/codex/blockexchange/peers/peerctxstore.nim b/codex/blockexchange/peers/peerctxstore.nim index 7cf167b4..739d92b5 100644 --- a/codex/blockexchange/peers/peerctxstore.nim +++ b/codex/blockexchange/peers/peerctxstore.nim @@ -10,6 +10,7 @@ import std/sequtils import std/tables import std/algorithm +import std/sequtils import pkg/upraises @@ -33,9 +34,7 @@ type PeerCtxStore* = ref object of RootObj peers*: OrderedTable[PeerId, BlockExcPeerCtx] - PeersForBlock* = object of RootObj - with*: seq[BlockExcPeerCtx] - without*: seq[BlockExcPeerCtx] + PeersForBlock* = tuple[with: seq[BlockExcPeerCtx], without: seq[BlockExcPeerCtx]] iterator items*(self: PeerCtxStore): BlockExcPeerCtx = for p in self.peers.values: @@ -47,6 +46,9 @@ proc contains*(a: openArray[BlockExcPeerCtx], b: PeerId): bool = a.anyIt(it.id == b) +func peerIds*(self: PeerCtxStore): seq[PeerId] = + toSeq(self.peers.keys) + func contains*(self: PeerCtxStore, peerId: PeerId): bool = peerId in self.peers @@ -75,7 +77,7 @@ func peersWant*(self: PeerCtxStore, cid: Cid): seq[BlockExcPeerCtx] = toSeq(self.peers.values).filterIt(it.peerWants.anyIt(it.address.cidOrTreeCid == cid)) proc getPeersForBlock*(self: PeerCtxStore, address: BlockAddress): PeersForBlock = - var res = PeersForBlock() + var res: PeersForBlock = (@[], @[]) for peer in self: if peer.peerHave.anyIt(it == address): res.with.add(peer) diff --git a/codex/codex.nim b/codex/codex.nim index dc577373..b8905205 100644 --- a/codex/codex.nim +++ b/codex/codex.nim @@ -311,7 +311,7 @@ proc new*( bufferSize = (1024 * 64), maxRequestBodySize = int.high, ) - .expect("Should start rest server!") + .expect("Should create rest server!") switch.mount(network) diff --git a/codex/conf.nim b/codex/conf.nim index 2a859efb..986a53d6 100644 --- a/codex/conf.nim +++ b/codex/conf.nim @@ -51,8 +51,8 @@ export units, net, codextypes, logutils, completeCmdArg, parseCmdArg, NatConfig export ValidationGroups, MaxSlots export - DefaultQuotaBytes, DefaultBlockTtl, DefaultBlockMaintenanceInterval, - DefaultNumberOfBlocksToMaintainPerInterval, DefaultRequestCacheSize + DefaultQuotaBytes, DefaultBlockTtl, DefaultBlockInterval, DefaultNumBlocksPerInterval, + DefaultRequestCacheSize type ThreadCount* = distinct Natural @@ -251,15 +251,15 @@ type desc: "Time interval in seconds - determines frequency of block " & "maintenance cycle: how often blocks are checked " & "for expiration and cleanup", - defaultValue: DefaultBlockMaintenanceInterval, - defaultValueDesc: $DefaultBlockMaintenanceInterval, + defaultValue: DefaultBlockInterval, + defaultValueDesc: $DefaultBlockInterval, name: "block-mi" .}: Duration blockMaintenanceNumberOfBlocks* {. desc: "Number of blocks to check every maintenance cycle", - defaultValue: DefaultNumberOfBlocksToMaintainPerInterval, - defaultValueDesc: $DefaultNumberOfBlocksToMaintainPerInterval, + defaultValue: DefaultNumBlocksPerInterval, + defaultValueDesc: $DefaultNumBlocksPerInterval, name: "block-mn" .}: int diff --git a/codex/logutils.nim b/codex/logutils.nim index b37f6952..e9604aba 100644 --- a/codex/logutils.nim +++ b/codex/logutils.nim @@ -152,7 +152,7 @@ proc formatTextLineSeq*(val: seq[string]): string = template formatIt*(format: LogFormat, T: typedesc, body: untyped) = # Provides formatters for logging with Chronicles for the given type and # `LogFormat`. - # NOTE: `seq[T]`, `Option[T]`, and `seq[Option[T]]` are overriddden + # NOTE: `seq[T]`, `Option[T]`, and `seq[Option[T]]` are overridden # since the base `setProperty` is generic using `auto` and conflicts with # providing a generic `seq` and `Option` override. when format == LogFormat.json: diff --git a/codex/node.nim b/codex/node.nim index e1647f3e..b0f66c90 100644 --- a/codex/node.nim +++ b/codex/node.nim @@ -45,13 +45,14 @@ import ./utils import ./errors import ./logutils import ./utils/asynciter +import ./utils/trackedfutures export logutils logScope: topics = "codex node" -const FetchBatch = 200 +const DefaultFetchBatch = 10 type Contracts* = @@ -72,6 +73,7 @@ type clock*: Clock storage*: Contracts taskpool: Taskpool + trackedFutures: TrackedFutures CodexNodeRef* = ref CodexNode @@ -163,8 +165,9 @@ proc fetchBatched*( self: CodexNodeRef, cid: Cid, iter: Iter[int], - batchSize = FetchBatch, + batchSize = DefaultFetchBatch, onBatch: BatchProc = nil, + fetchLocal = true, ): Future[?!void] {.async, gcsafe.} = ## Fetch blocks in batches of `batchSize` ## @@ -179,7 +182,9 @@ proc fetchBatched*( let blocks = collect: for i in 0 ..< batchSize: if not iter.finished: - self.networkStore.getBlock(BlockAddress.init(cid, iter.next())) + let address = BlockAddress.init(cid, iter.next()) + if not (await address in self.networkStore) or fetchLocal: + self.networkStore.getBlock(address) if blocksErr =? (await allFutureResult(blocks)).errorOption: return failure(blocksErr) @@ -188,21 +193,25 @@ proc fetchBatched*( batchErr =? (await onBatch(blocks.mapIt(it.read.get))).errorOption: return failure(batchErr) + await sleepAsync(1.millis) + success() proc fetchBatched*( self: CodexNodeRef, manifest: Manifest, - batchSize = FetchBatch, + batchSize = DefaultFetchBatch, onBatch: BatchProc = nil, + fetchLocal = true, ): Future[?!void] = ## Fetch manifest in batches of `batchSize` ## - trace "Fetching blocks in batches of", size = batchSize + trace "Fetching blocks in batches of", + size = batchSize, blocksCount = manifest.blocksCount let iter = Iter[int].new(0 ..< manifest.blocksCount) - self.fetchBatched(manifest.treeCid, iter, batchSize, onBatch) + self.fetchBatched(manifest.treeCid, iter, batchSize, onBatch, fetchLocal) proc streamSingleBlock(self: CodexNodeRef, cid: Cid): Future[?!LPStream] {.async.} = ## Streams the contents of a single block. @@ -223,35 +232,64 @@ proc streamSingleBlock(self: CodexNodeRef, cid: Cid): Future[?!LPStream] {.async finally: await stream.pushEof() - asyncSpawn streamOneBlock() + self.trackedFutures.track(streamOneBlock()) LPStream(stream).success proc streamEntireDataset( - self: CodexNodeRef, manifest: Manifest, manifestCid: Cid + self: CodexNodeRef, + manifest: Manifest, + manifestCid: Cid, + prefetchBatch = DefaultFetchBatch, ): Future[?!LPStream] {.async.} = ## Streams the contents of the entire dataset described by the manifest. + ## Background jobs (erasure decoding and prefetching) will be cancelled when + ## the stream is closed. ## trace "Retrieving blocks from manifest", manifestCid + let stream = LPStream(StoreStream.new(self.networkStore, manifest, pad = false)) + var jobs: seq[Future[void]] + if manifest.protected: # Retrieve, decode and save to the local store all EС groups - proc erasureJob(): Future[?!void] {.async.} = - # Spawn an erasure decoding job - let erasure = Erasure.new( - self.networkStore, leoEncoderProvider, leoDecoderProvider, self.taskpool - ) - without _ =? (await erasure.decode(manifest)), error: - error "Unable to erasure decode manifest", manifestCid, exc = error.msg - return failure(error) + proc erasureJob(): Future[void] {.async.} = + try: + # Spawn an erasure decoding job + let erasure = Erasure.new( + self.networkStore, leoEncoderProvider, leoDecoderProvider, self.taskpool + ) + without _ =? (await erasure.decode(manifest)), error: + error "Unable to erasure decode manifest", manifestCid, exc = error.msg + except CancelledError: + trace "Erasure job cancelled", manifestCid + except CatchableError as exc: + trace "Error erasure decoding manifest", manifestCid, exc = exc.msg - return success() + jobs.add(erasureJob()) - if err =? (await erasureJob()).errorOption: - return failure(err) + proc prefetch(): Future[void] {.async.} = + try: + if err =? + (await self.fetchBatched(manifest, prefetchBatch, fetchLocal = false)).errorOption: + error "Unable to fetch blocks", err = err.msg + except CancelledError: + trace "Prefetch job cancelled" + except CatchableError as exc: + error "Error fetching blocks", exc = exc.msg + + jobs.add(prefetch()) + + # Monitor stream completion and cancel background jobs when done + proc monitorStream() {.async.} = + try: + await stream.join() + finally: + await allFutures(jobs.mapIt(it.cancelAndWait)) + + self.trackedFutures.track(monitorStream()) - # Retrieve all blocks of the dataset sequentially from the local store or network trace "Creating store stream for manifest", manifestCid - LPStream(StoreStream.new(self.networkStore, manifest, pad = false)).success + stream.success proc retrieve*( self: CodexNodeRef, cid: Cid, local: bool = true @@ -758,6 +796,11 @@ proc start*(self: CodexNodeRef) {.async.} = proc stop*(self: CodexNodeRef) {.async.} = trace "Stopping node" + if not self.taskpool.isNil: + self.taskpool.shutdown() + + await self.trackedFutures.cancelTracked() + if not self.engine.isNil: await self.engine.stop() @@ -779,9 +822,6 @@ proc stop*(self: CodexNodeRef) {.async.} = if not self.networkStore.isNil: await self.networkStore.close - if not self.taskpool.isNil: - self.taskpool.shutdown() - proc new*( T: type CodexNodeRef, switch: Switch, @@ -803,4 +843,5 @@ proc new*( discovery: discovery, taskPool: taskpool, contracts: contracts, + trackedFutures: TrackedFutures(), ) diff --git a/codex/rest/api.nim b/codex/rest/api.nim index e5c8d195..89dbe220 100644 --- a/codex/rest/api.nim +++ b/codex/rest/api.nim @@ -13,8 +13,8 @@ push: {.upraises: [].} import std/sequtils -import mimetypes -import os +import std/mimetypes +import std/os import pkg/questionable import pkg/questionable/results @@ -120,7 +120,7 @@ proc retrieveCid( await resp.finish() codex_api_downloads.inc() except CatchableError as exc: - warn "Excepting streaming blocks", exc = exc.msg + warn "Error streaming blocks", exc = exc.msg resp.status = Http500 return await resp.sendBody("") finally: diff --git a/codex/rng.nim b/codex/rng.nim index 9d82156e..866d65f8 100644 --- a/codex/rng.nim +++ b/codex/rng.nim @@ -55,6 +55,15 @@ proc sample*[T]( break +proc sample*[T]( + rng: Rng, sample: openArray[T], limit: int +): seq[T] {.raises: [Defect, RngSampleError].} = + if limit > sample.len: + raise newException(RngSampleError, "Limit cannot be larger than sample!") + + for _ in 0 ..< min(sample.len, limit): + result.add(rng.sample(sample, result)) + proc shuffle*[T](rng: Rng, a: var openArray[T]) = for i in countdown(a.high, 1): let j = rng.rand(i) diff --git a/codex/slots/builder/builder.nim b/codex/slots/builder/builder.nim index 74597ff1..30332f1c 100644 --- a/codex/slots/builder/builder.nim +++ b/codex/slots/builder/builder.nim @@ -189,7 +189,7 @@ proc getCellHashes*[T, H]( blkIdx = blkIdx pos = i - trace "Getting block CID for tree at index", index = blkIdx + trace "Getting block CID for tree at index" without (_, tree) =? (await self.buildBlockTree(blkIdx, i)) and digest =? tree.root, err: error "Failed to get block CID for tree at index", err = err.msg diff --git a/codex/stores/maintenance.nim b/codex/stores/maintenance.nim index e7ce1bdf..cced5da9 100644 --- a/codex/stores/maintenance.nim +++ b/codex/stores/maintenance.nim @@ -22,8 +22,8 @@ import ../logutils import ../systemclock const - DefaultBlockMaintenanceInterval* = 10.minutes - DefaultNumberOfBlocksToMaintainPerInterval* = 1000 + DefaultBlockInterval* = 10.minutes + DefaultNumBlocksPerInterval* = 1000 type BlockMaintainer* = ref object of RootObj repoStore: RepoStore diff --git a/codex/stores/networkstore.nim b/codex/stores/networkstore.nim index faee36e1..f94bca33 100644 --- a/codex/stores/networkstore.nim +++ b/codex/stores/networkstore.nim @@ -137,6 +137,14 @@ method hasBlock*(self: NetworkStore, cid: Cid): Future[?!bool] {.async.} = trace "Checking network store for block existence", cid return await self.localStore.hasBlock(cid) +method hasBlock*( + self: NetworkStore, tree: Cid, index: Natural +): Future[?!bool] {.async.} = + ## Check if the block exists in the blockstore + ## + trace "Checking network store for block existence", tree, index + return await self.localStore.hasBlock(tree, index) + method close*(self: NetworkStore): Future[void] {.async.} = ## Close the underlying local blockstore ## diff --git a/codex/stores/repostore/types.nim b/codex/stores/repostore/types.nim index 3d455d12..42f528e9 100644 --- a/codex/stores/repostore/types.nim +++ b/codex/stores/repostore/types.nim @@ -21,8 +21,8 @@ import ../../systemclock import ../../units const - DefaultBlockTtl* = 24.hours - DefaultQuotaBytes* = 8.GiBs + DefaultBlockTtl* = 30.days + DefaultQuotaBytes* = 20.GiBs type QuotaNotEnoughError* = object of CodexError diff --git a/codex/utils/asyncspawn.nim b/codex/utils/asyncspawn.nim deleted file mode 100644 index 95a9f014..00000000 --- a/codex/utils/asyncspawn.nim +++ /dev/null @@ -1,10 +0,0 @@ -import pkg/chronos - -proc asyncSpawn*(future: Future[void], ignore: type CatchableError) = - proc ignoringError() {.async.} = - try: - await future - except ignore: - discard - - asyncSpawn ignoringError() diff --git a/codex/utils/natutils.nim b/codex/utils/natutils.nim index 43909588..996d8dd0 100644 --- a/codex/utils/natutils.nim +++ b/codex/utils/natutils.nim @@ -1,6 +1,7 @@ {.push raises: [].} -import std/[tables, hashes], pkg/results, stew/shims/net as stewNet, chronos, chronicles +import + std/[tables, hashes], pkg/results, pkg/stew/shims/net as stewNet, chronos, chronicles import pkg/libp2p diff --git a/tests/codex/blockexchange/discovery/testdiscoveryengine.nim b/tests/codex/blockexchange/discovery/testdiscoveryengine.nim index 904703a0..93704726 100644 --- a/tests/codex/blockexchange/discovery/testdiscoveryengine.nim +++ b/tests/codex/blockexchange/discovery/testdiscoveryengine.nim @@ -76,7 +76,7 @@ asyncchecksuite "Test Discovery Engine": ) await discoveryEngine.start() - await allFuturesThrowing(allFinished(wants)).wait(1.seconds) + await allFuturesThrowing(allFinished(wants)).wait(100.millis) await discoveryEngine.stop() test "Should queue discovery request": @@ -101,7 +101,7 @@ asyncchecksuite "Test Discovery Engine": await discoveryEngine.start() discoveryEngine.queueFindBlocksReq(@[blocks[0].cid]) - await want.wait(1.seconds) + await want.wait(100.millis) await discoveryEngine.stop() test "Should not request more than minPeersPerBlock": diff --git a/tests/codex/blockexchange/engine/testblockexc.nim b/tests/codex/blockexchange/engine/testblockexc.nim index aa15f795..0c250231 100644 --- a/tests/codex/blockexchange/engine/testblockexc.nim +++ b/tests/codex/blockexchange/engine/testblockexc.nim @@ -1,5 +1,6 @@ import std/sequtils import std/algorithm +import std/importutils import pkg/chronos import pkg/stew/byteutils @@ -20,7 +21,7 @@ asyncchecksuite "NetworkStore engine - 2 nodes": peerCtx1, peerCtx2: BlockExcPeerCtx pricing1, pricing2: Pricing blocks1, blocks2: seq[bt.Block] - pendingBlocks1, pendingBlocks2: seq[Future[bt.Block]] + pendingBlocks1, pendingBlocks2: seq[BlockHandle] setup: blocks1 = await makeRandomBlocks(datasetSize = 2048, blockSize = 256'nb) @@ -56,7 +57,7 @@ asyncchecksuite "NetworkStore engine - 2 nodes": nodeCmps2.switch.peerInfo.peerId, nodeCmps2.switch.peerInfo.addrs ) - await sleepAsync(1.seconds) # give some time to exchange lists + await sleepAsync(100.millis) # give some time to exchange lists peerCtx2 = nodeCmps1.peerStore.get(nodeCmps2.switch.peerInfo.peerId) peerCtx1 = nodeCmps2.peerStore.get(nodeCmps1.switch.peerInfo.peerId) @@ -75,7 +76,6 @@ asyncchecksuite "NetworkStore engine - 2 nodes": test "Should exchange blocks on connect": await allFuturesThrowing(allFinished(pendingBlocks1)).wait(10.seconds) - await allFuturesThrowing(allFinished(pendingBlocks2)).wait(10.seconds) check: @@ -178,7 +178,7 @@ asyncchecksuite "NetworkStore - multiple nodes": (await nodes[i div 4].networkStore.engine.localStore.putBlock(blocks[i])).tryGet() await connectNodes(nodes) - await sleepAsync(1.seconds) + await sleepAsync(100.millis) await allFuturesThrowing(allFinished(pendingBlocks)) @@ -203,45 +203,9 @@ asyncchecksuite "NetworkStore - multiple nodes": (await nodes[i div 4].networkStore.engine.localStore.putBlock(blocks[i])).tryGet() await connectNodes(nodes) - await sleepAsync(1.seconds) + await sleepAsync(100.millis) await allFuturesThrowing(allFinished(pendingBlocks1), allFinished(pendingBlocks2)) check pendingBlocks1.mapIt(it.read) == blocks[0 .. 3] check pendingBlocks2.mapIt(it.read) == blocks[12 .. 15] - - test "Should actively cancel want-haves if block received from elsewhere": - let - # Peer wanting to download blocks - downloader = nodes[4] - # Bystander peer - gets block request but can't satisfy them - bystander = nodes[3] - # Holder of actual blocks - blockHolder = nodes[1] - - let aBlock = blocks[0] - (await blockHolder.engine.localStore.putBlock(aBlock)).tryGet() - - await connectNodes(@[downloader, bystander]) - # Downloader asks for block... - let blockRequest = downloader.engine.requestBlock(aBlock.cid) - - # ... and bystander learns that downloader wants it, but can't provide it. - check eventually( - bystander.engine.peers - .get(downloader.switch.peerInfo.peerId).peerWants - .filterIt(it.address == aBlock.address).len == 1 - ) - - # As soon as we connect the downloader to the blockHolder, the block should - # propagate to the downloader... - await connectNodes(@[downloader, blockHolder]) - check (await blockRequest).tryGet().cid == aBlock.cid - check (await downloader.engine.localStore.hasBlock(aBlock.cid)).tryGet() - - # ... and the bystander should have cancelled the want-have - check eventually( - bystander.engine.peers - .get(downloader.switch.peerInfo.peerId).peerWants - .filterIt(it.address == aBlock.address).len == 0 - ) diff --git a/tests/codex/blockexchange/engine/testengine.nim b/tests/codex/blockexchange/engine/testengine.nim index f7cc8294..cc5511e8 100644 --- a/tests/codex/blockexchange/engine/testengine.nim +++ b/tests/codex/blockexchange/engine/testengine.nim @@ -20,6 +20,11 @@ import ../../../asynctest import ../../helpers import ../../examples +const NopSendWantCancellationsProc = proc( + id: PeerId, addresses: seq[BlockAddress] +) {.gcsafe, async.} = + discard + asyncchecksuite "NetworkStore engine basic": var rng: Rng @@ -129,11 +134,6 @@ asyncchecksuite "NetworkStore engine handlers": localStore: BlockStore blocks: seq[Block] - const NopSendWantCancellationsProc = proc( - id: PeerId, addresses: seq[BlockAddress] - ) {.gcsafe, async.} = - discard - setup: rng = Rng.instance() chunker = RandomChunker.new(rng, size = 1024'nb, chunkSize = 256'nb) @@ -292,7 +292,8 @@ asyncchecksuite "NetworkStore engine handlers": await done.wait(100.millis) test "Should handle block presence": - var handles: Table[Cid, Future[Block]] + var handles: + Table[Cid, Future[Block].Raising([CancelledError, RetriesExhaustedError])] proc sendWantList( id: PeerId, @@ -333,6 +334,10 @@ asyncchecksuite "NetworkStore engine handlers": blocksDelivery = blocks.mapIt(BlockDelivery(blk: it, address: it.address)) cancellations = newTable(blocks.mapIt((it.address, newFuture[void]())).toSeq) + peerCtx.blocks = blocks.mapIt( + (it.address, Presence(address: it.address, have: true, price: UInt256.example)) + ).toTable + proc sendWantCancellations( id: PeerId, addresses: seq[BlockAddress] ) {.gcsafe, async.} = @@ -344,9 +349,168 @@ asyncchecksuite "NetworkStore engine handlers": ) await engine.blocksDeliveryHandler(peerId, blocksDelivery) - discard await allFinished(pending) + discard await allFinished(pending).wait(100.millis) await allFuturesThrowing(cancellations.values().toSeq) +asyncchecksuite "Block Download": + var + rng: Rng + seckey: PrivateKey + peerId: PeerId + chunker: Chunker + wallet: WalletRef + blockDiscovery: Discovery + peerStore: PeerCtxStore + pendingBlocks: PendingBlocksManager + network: BlockExcNetwork + engine: BlockExcEngine + discovery: DiscoveryEngine + advertiser: Advertiser + peerCtx: BlockExcPeerCtx + localStore: BlockStore + blocks: seq[Block] + + setup: + rng = Rng.instance() + chunker = RandomChunker.new(rng, size = 1024'nb, chunkSize = 256'nb) + + while true: + let chunk = await chunker.getBytes() + if chunk.len <= 0: + break + + blocks.add(Block.new(chunk).tryGet()) + + seckey = PrivateKey.random(rng[]).tryGet() + peerId = PeerId.init(seckey.getPublicKey().tryGet()).tryGet() + wallet = WalletRef.example + blockDiscovery = Discovery.new() + peerStore = PeerCtxStore.new() + pendingBlocks = PendingBlocksManager.new() + + localStore = CacheStore.new() + network = BlockExcNetwork() + + discovery = + DiscoveryEngine.new(localStore, peerStore, network, blockDiscovery, pendingBlocks) + + advertiser = Advertiser.new(localStore, blockDiscovery) + + engine = BlockExcEngine.new( + localStore, wallet, network, discovery, advertiser, peerStore, pendingBlocks + ) + + peerCtx = BlockExcPeerCtx(id: peerId) + engine.peers.add(peerCtx) + + test "Should exhaust retries": + var + retries = 2 + address = BlockAddress.init(blocks[0].cid) + + proc sendWantList( + id: PeerId, + addresses: seq[BlockAddress], + priority: int32 = 0, + cancel: bool = false, + wantType: WantType = WantType.WantHave, + full: bool = false, + sendDontHave: bool = false, + ) {.gcsafe, async.} = + check wantType == WantHave + check not engine.pendingBlocks.isInFlight(address) + check engine.pendingBlocks.retries(address) == retries + retries -= 1 + + engine.pendingBlocks.blockRetries = 2 + engine.pendingBlocks.retryInterval = 10.millis + engine.network = + BlockExcNetwork(request: BlockExcRequest(sendWantList: sendWantList)) + + let pending = engine.requestBlock(address) + + expect RetriesExhaustedError: + discard (await pending).tryGet() + + test "Should retry block request": + let + address = BlockAddress.init(blocks[0].cid) + steps = newAsyncEvent() + + proc sendWantList( + id: PeerId, + addresses: seq[BlockAddress], + priority: int32 = 0, + cancel: bool = false, + wantType: WantType = WantType.WantHave, + full: bool = false, + sendDontHave: bool = false, + ) {.gcsafe, async.} = + case wantType + of WantHave: + check engine.pendingBlocks.isInFlight(address) == false + check engine.pendingBlocks.retriesExhausted(address) == false + steps.fire() + of WantBlock: + check engine.pendingBlocks.isInFlight(address) == true + check engine.pendingBlocks.retriesExhausted(address) == false + steps.fire() + + engine.pendingBlocks.blockRetries = 10 + engine.pendingBlocks.retryInterval = 10.millis + engine.network = BlockExcNetwork( + request: BlockExcRequest( + sendWantList: sendWantList, sendWantCancellations: NopSendWantCancellationsProc + ) + ) + + let pending = engine.requestBlock(address) + await steps.wait() + + # add blocks presence + peerCtx.blocks = blocks.mapIt( + (it.address, Presence(address: it.address, have: true, price: UInt256.example)) + ).toTable + + steps.clear() + await steps.wait() + + await engine.blocksDeliveryHandler( + peerId, @[BlockDelivery(blk: blocks[0], address: address)] + ) + check (await pending).tryGet() == blocks[0] + + test "Should cancel block request": + var + address = BlockAddress.init(blocks[0].cid) + done = newFuture[void]() + + proc sendWantList( + id: PeerId, + addresses: seq[BlockAddress], + priority: int32 = 0, + cancel: bool = false, + wantType: WantType = WantType.WantHave, + full: bool = false, + sendDontHave: bool = false, + ) {.gcsafe, async.} = + done.complete() + + engine.pendingBlocks.blockRetries = 10 + engine.pendingBlocks.retryInterval = 1.seconds + engine.network = BlockExcNetwork( + request: BlockExcRequest( + sendWantList: sendWantList, sendWantCancellations: NopSendWantCancellationsProc + ) + ) + + let pending = engine.requestBlock(address) + await done.wait(100.millis) + + pending.cancel() + expect CancelledError: + discard (await pending).tryGet() + asyncchecksuite "Task Handler": var rng: Rng diff --git a/tests/codex/blockexchange/testpendingblocks.nim b/tests/codex/blockexchange/testpendingblocks.nim index 45b065c0..29410db7 100644 --- a/tests/codex/blockexchange/testpendingblocks.nim +++ b/tests/codex/blockexchange/testpendingblocks.nim @@ -28,7 +28,10 @@ checksuite "Pending Blocks": check blk.cid in pendingBlocks pendingBlocks.resolve(@[blk].mapIt(BlockDelivery(blk: it, address: it.address))) - check (await handle) == blk + await sleepAsync(0.millis) + # trigger the event loop, otherwise the block finishes before poll runs + let resolved = await handle + check resolved == blk check blk.cid notin pendingBlocks test "Should cancel want handle": @@ -41,20 +44,6 @@ checksuite "Pending Blocks": await handle.cancelAndWait() check blk.cid notin pendingBlocks - test "Should expire want handle": - let - pendingBlocks = PendingBlocksManager.new() - blk = bt.Block.new("Hello".toBytes).tryGet - handle = pendingBlocks.getWantHandle(blk.cid, 1.millis) - - check blk.cid in pendingBlocks - - await sleepAsync(10.millis) - expect AsyncTimeoutError: - discard await handle - - check blk.cid notin pendingBlocks - test "Should get wants list": let pendingBlocks = PendingBlocksManager.new() @@ -79,3 +68,19 @@ checksuite "Pending Blocks": check: (await allFinished(wantHandles)).mapIt($it.read.cid).sorted(cmp[string]) == (await allFinished(handles)).mapIt($it.read.cid).sorted(cmp[string]) + + test "Should handle retry counters": + let + pendingBlocks = PendingBlocksManager.new(3) + blk = bt.Block.new("Hello".toBytes).tryGet + address = BlockAddress.init(blk.cid) + handle = pendingBlocks.getWantHandle(blk.cid) + + check pendingBlocks.retries(address) == 3 + pendingBlocks.decRetries(address) + check pendingBlocks.retries(address) == 2 + pendingBlocks.decRetries(address) + check pendingBlocks.retries(address) == 1 + pendingBlocks.decRetries(address) + check pendingBlocks.retries(address) == 0 + check pendingBlocks.retriesExhausted(address) diff --git a/tests/codex/node/helpers.nim b/tests/codex/node/helpers.nim index 2d1a87dc..a28a1f37 100644 --- a/tests/codex/node/helpers.nim +++ b/tests/codex/node/helpers.nim @@ -123,7 +123,7 @@ template setupAndTearDown*() {.dirty.} = ) teardown: - close(file) + file.close() await node.stop() await metaTmp.destroyDb() await repoTmp.destroyDb() diff --git a/tests/codex/node/testnode.nim b/tests/codex/node/testnode.nim index 0700203d..511badef 100644 --- a/tests/codex/node/testnode.nim +++ b/tests/codex/node/testnode.nim @@ -64,21 +64,6 @@ asyncchecksuite "Test Node - Basic": check: fetched == manifest - test "Should not lookup non-existing blocks twice": - # https://github.com/codex-storage/nim-codex/issues/699 - let - cstore = CountingStore.new(engine, localStore) - node = CodexNodeRef.new(switch, cstore, engine, blockDiscovery, Taskpool.new()) - missingCid = - Cid.init("zDvZRwzmCvtiyubW9AecnxgLnXK8GrBvpQJBDzToxmzDN6Nrc2CZ").get() - - engine.blockFetchTimeout = timer.milliseconds(100) - - discard await node.retrieve(missingCid, local = false) - - let lookupCount = cstore.lookups.getOrDefault(missingCid) - check lookupCount == 1 - test "Block Batching": let manifest = await storeDataGetManifest(localStore, chunker) @@ -93,17 +78,15 @@ asyncchecksuite "Test Node - Basic": ) ).tryGet() - test "Store and retrieve Data Stream": + test "Should store Data Stream": let stream = BufferStream.new() storeFut = node.store(stream) - oddChunkSize = math.trunc(DefaultBlockSize.float / 3.14).NBytes # Let's check that node.store can correctly rechunk these odd chunks - oddChunker = FileChunker.new(file = file, chunkSize = oddChunkSize, pad = false) - # TODO: doesn't work with pad=tue + oddChunker = FileChunker.new(file = file, chunkSize = 1024.NBytes, pad = false) + # don't pad, so `node.store` gets the correct size var original: seq[byte] - try: while (let chunk = await oddChunker.getBytes(); chunk.len > 0): original &= chunk @@ -116,13 +99,35 @@ asyncchecksuite "Test Node - Basic": manifestCid = (await storeFut).tryGet() manifestBlock = (await localStore.getBlock(manifestCid)).tryGet() localManifest = Manifest.decode(manifestBlock).tryGet() - data = await (await node.retrieve(manifestCid)).drain() + var data: seq[byte] + for i in 0 ..< localManifest.blocksCount: + let blk = (await localStore.getBlock(localManifest.treeCid, i)).tryGet() + data &= blk.data + + data.setLen(localManifest.datasetSize.int) # truncate data to original size check: - data.len == localManifest.datasetSize.int data.len == original.len sha256.digest(data) == sha256.digest(original) + test "Should retrieve a Data Stream": + let + manifest = await storeDataGetManifest(localStore, chunker) + manifestBlk = + bt.Block.new(data = manifest.encode().tryGet, codec = ManifestCodec).tryGet() + + (await localStore.putBlock(manifestBlk)).tryGet() + let data = await ((await node.retrieve(manifestBlk.cid)).tryGet()).drain() + + var storedData: seq[byte] + for i in 0 ..< manifest.blocksCount: + let blk = (await localStore.getBlock(manifest.treeCid, i)).tryGet() + storedData &= blk.data + + storedData.setLen(manifest.datasetSize.int) # truncate data to original size + check: + storedData == data + test "Retrieve One Block": let testString = "Block 1" diff --git a/tests/integration/testrestapi.nim b/tests/integration/testrestapi.nim index a748c98e..2311dc22 100644 --- a/tests/integration/testrestapi.nim +++ b/tests/integration/testrestapi.nim @@ -37,7 +37,7 @@ twonodessuite "REST API": let space = client1.space().tryGet() check: space.totalBlocks == 2 - space.quotaMaxBytes == 8589934592.NBytes + space.quotaMaxBytes == 21474836480.NBytes space.quotaUsedBytes == 65592.NBytes space.quotaReservedBytes == 12.NBytes diff --git a/vendor/nim-serde b/vendor/nim-serde index 69a7a011..c82e85c6 160000 --- a/vendor/nim-serde +++ b/vendor/nim-serde @@ -1 +1 @@ -Subproject commit 69a7a0111addaa4aad885dd4bd7b5ee4684a06de +Subproject commit c82e85c62436218592fbe876df5ac389ef8b964b From 16dce0fc437dfc655bbf18991e205fa8777fe92a Mon Sep 17 00:00:00 2001 From: Slava <20563034+veaceslavdoina@users.noreply.github.com> Date: Tue, 25 Feb 2025 11:19:29 +0200 Subject: [PATCH 2/7] chore: update testnet marketplace address (#1127) https://github.com/codex-storage/nim-codex/issues/1126 --- codex/contracts/deployment.nim | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/codex/contracts/deployment.nim b/codex/contracts/deployment.nim index c4e59b80..cc125d18 100644 --- a/codex/contracts/deployment.nim +++ b/codex/contracts/deployment.nim @@ -18,9 +18,9 @@ const knownAddresses = { # Taiko Alpha-3 Testnet "167005": {"Marketplace": Address.init("0x948CF9291b77Bd7ad84781b9047129Addf1b894F")}.toTable, - # Codex Testnet - Nov 25 2024 18:41:29 PM (+00:00 UTC) + # Codex Testnet - Feb 25 2025 07:24:19 AM (+00:00 UTC) "789987": - {"Marketplace": Address.init("0xAB03b6a58C5262f530D54146DA2a552B1C0F7648")}.toTable, + {"Marketplace": Address.init("0xfFaF679D5Cbfdd5Dbc9Be61C616ed115DFb597ed")}.toTable, }.toTable proc getKnownAddress(T: type, chainId: UInt256): ?Address = From fab5e16afda7d9ffd22369cc04603ad9492c8b6b Mon Sep 17 00:00:00 2001 From: Ben Bierens <39762930+benbierens@users.noreply.github.com> Date: Thu, 27 Feb 2025 12:29:27 +0100 Subject: [PATCH 3/7] Missing nullability causes json-serialize failure in some generated clients. (#1129) --- openapi.yaml | 1 + 1 file changed, 1 insertion(+) diff --git a/openapi.yaml b/openapi.yaml index 70da398b..53a908a3 100644 --- a/openapi.yaml +++ b/openapi.yaml @@ -325,6 +325,7 @@ components: - unknown error: type: string + nullable: true description: If Request failed, then here is presented the error message request: $ref: "#/components/schemas/StorageRequest" From 7065718e0912004492c75cbed1036c7c8ec8939e Mon Sep 17 00:00:00 2001 From: Arnaud Date: Thu, 27 Feb 2025 17:58:23 +0100 Subject: [PATCH 4/7] feat(marketplace): indicate that slot is being repaired when trying to download (#1083) * Indicate that slot is being repaired when trying to download * Fix tests * Apply nph * Calculate the repair collateral when adding the item into the queue * Add slotCollateral calculation with getRequest cache and remove populationItem function * Update with pricePerByte * Simplify StorageAsk parameter * Minor fixes * Move cache request to another PR * Rename SlotQueueItem collateral and required in init * Use override func to optimise calls when the slot state is known * Remove unused code * Cosmetic change * Use raiseMarketError helper * Add exceptions to async pragma * Cosmetic change * Use raiseMarketError helper * Let slotCollateral determines the slot sate * Use configSync to avoid async pragma in onStorageRequested * Add loadConfig function * Add CatchableError to async pragma * Add missing pragma raises errors * Move loadConfig * Avoid swallow CancelledError * Avoid swallowing CancelledError * Avoid swallowing CancelledError * Update error messages * Except MarketError instead of CatchableError * Fix merge issue * Log fatal when configuration cannot be loaded * Propagate MarketError in slotCollateral * Remove useless configSync * Use result with explicit error * Fix syntax --------- Signed-off-by: Arnaud --- codex/codex.nim | 4 + codex/contracts/market.nim | 126 +++++++++--- codex/market.nim | 41 +++- codex/node.nim | 17 +- codex/sales.nim | 92 ++++++--- codex/sales/salescontext.nim | 2 +- codex/sales/slotqueue.nim | 55 ++--- codex/sales/states/downloading.nim | 5 +- codex/sales/states/filling.nim | 17 +- tests/codex/helpers/mockmarket.nim | 65 +++++- tests/codex/helpers/mockslotqueueitem.nim | 4 +- tests/codex/node/testcontracts.nim | 2 +- tests/codex/sales/testsales.nim | 41 ++-- tests/codex/sales/testslotqueue.nim | 235 ++++++++++++++-------- tests/contracts/testMarket.nim | 31 +++ tests/examples.nim | 4 +- 16 files changed, 521 insertions(+), 220 deletions(-) diff --git a/codex/codex.nim b/codex/codex.nim index b8905205..8a03510c 100644 --- a/codex/codex.nim +++ b/codex/codex.nim @@ -134,6 +134,10 @@ proc bootstrapInteractions(s: CodexServer): Future[void] {.async.} = if config.simulateProofFailures > 0: warn "Proof failure simulation is not enabled for this build! Configuration ignored" + if error =? (await market.loadConfig()).errorOption: + fatal "Cannot load market configuration", error = error.msg + quit QuitFailure + let purchasing = Purchasing.new(market, clock) let sales = Sales.new(market, clock, repo, proofFailures) client = some ClientInteractions.new(clock, purchasing) diff --git a/codex/contracts/market.nim b/codex/contracts/market.nim index 3c016a59..9079ac8a 100644 --- a/codex/contracts/market.nim +++ b/codex/contracts/market.nim @@ -55,11 +55,17 @@ template convertEthersError(body) = except EthersError as error: raiseMarketError(error.msgDetail) -proc config(market: OnChainMarket): Future[MarketplaceConfig] {.async.} = +proc config( + market: OnChainMarket +): Future[MarketplaceConfig] {.async: (raises: [CancelledError, MarketError]).} = without resolvedConfig =? market.configuration: - let fetchedConfig = await market.contract.configuration() - market.configuration = some fetchedConfig - return fetchedConfig + if err =? (await market.loadConfig()).errorOption: + raiseMarketError(err.msg) + + without config =? market.configuration: + raiseMarketError("Failed to access to config from the Marketplace contract") + + return config return resolvedConfig @@ -70,7 +76,26 @@ proc approveFunds(market: OnChainMarket, amount: UInt256) {.async.} = let token = Erc20Token.new(tokenAddress, market.signer) discard await token.increaseAllowance(market.contract.address(), amount).confirm(1) -method getZkeyHash*(market: OnChainMarket): Future[?string] {.async.} = +method loadConfig*( + market: OnChainMarket +): Future[?!void] {.async: (raises: [CancelledError]).} = + try: + without config =? market.configuration: + let fetchedConfig = await market.contract.configuration() + + market.configuration = some fetchedConfig + + return success() + except AsyncLockError, EthersError: + let err = getCurrentException() + return failure newException( + MarketError, + "Failed to fetch the config from the Marketplace contract: " & err.msg, + ) + +method getZkeyHash*( + market: OnChainMarket +): Future[?string] {.async: (raises: [CancelledError, MarketError]).} = let config = await market.config() return some config.proofs.zkeyHash @@ -78,18 +103,24 @@ method getSigner*(market: OnChainMarket): Future[Address] {.async.} = convertEthersError: return await market.signer.getAddress() -method periodicity*(market: OnChainMarket): Future[Periodicity] {.async.} = +method periodicity*( + market: OnChainMarket +): Future[Periodicity] {.async: (raises: [CancelledError, MarketError]).} = convertEthersError: let config = await market.config() let period = config.proofs.period return Periodicity(seconds: period) -method proofTimeout*(market: OnChainMarket): Future[uint64] {.async.} = +method proofTimeout*( + market: OnChainMarket +): Future[uint64] {.async: (raises: [CancelledError, MarketError]).} = convertEthersError: let config = await market.config() return config.proofs.timeout -method repairRewardPercentage*(market: OnChainMarket): Future[uint8] {.async.} = +method repairRewardPercentage*( + market: OnChainMarket +): Future[uint8] {.async: (raises: [CancelledError, MarketError]).} = convertEthersError: let config = await market.config() return config.collateral.repairRewardPercentage @@ -99,7 +130,9 @@ method requestDurationLimit*(market: OnChainMarket): Future[uint64] {.async.} = let config = await market.config() return config.requestDurationLimit -method proofDowntime*(market: OnChainMarket): Future[uint8] {.async.} = +method proofDowntime*( + market: OnChainMarket +): Future[uint8] {.async: (raises: [CancelledError, MarketError]).} = convertEthersError: let config = await market.config() return config.proofs.downtime @@ -128,19 +161,22 @@ method requestStorage(market: OnChainMarket, request: StorageRequest) {.async.} method getRequest*( market: OnChainMarket, id: RequestId -): Future[?StorageRequest] {.async.} = - let key = $id +): Future[?StorageRequest] {.async: (raises: [CancelledError]).} = + try: + let key = $id - if market.requestCache.contains(key): - return some market.requestCache[key] + if key in market.requestCache: + return some market.requestCache[key] - convertEthersError: - try: - let request = await market.contract.getRequest(id) - market.requestCache[key] = request - return some request - except Marketplace_UnknownRequest: - return none StorageRequest + let request = await market.contract.getRequest(id) + market.requestCache[key] = request + return some request + except Marketplace_UnknownRequest, KeyError: + warn "Cannot retrieve the request", error = getCurrentExceptionMsg() + return none StorageRequest + except EthersError, AsyncLockError: + error "Cannot retrieve the request", error = getCurrentExceptionMsg() + return none StorageRequest method requestState*( market: OnChainMarket, requestId: RequestId @@ -152,10 +188,17 @@ method requestState*( except Marketplace_UnknownRequest: return none RequestState -method slotState*(market: OnChainMarket, slotId: SlotId): Future[SlotState] {.async.} = +method slotState*( + market: OnChainMarket, slotId: SlotId +): Future[SlotState] {.async: (raises: [CancelledError, MarketError]).} = convertEthersError: - let overrides = CallOverrides(blockTag: some BlockTag.pending) - return await market.contract.slotState(slotId, overrides) + try: + let overrides = CallOverrides(blockTag: some BlockTag.pending) + return await market.contract.slotState(slotId, overrides) + except AsyncLockError as err: + raiseMarketError( + "Failed to fetch the slot state from the Marketplace contract: " & err.msg + ) method getRequestEnd*( market: OnChainMarket, id: RequestId @@ -507,3 +550,40 @@ method queryPastStorageRequestedEvents*( let fromBlock = await market.contract.provider.pastBlockTag(blocksAgo) return await market.queryPastStorageRequestedEvents(fromBlock) + +method slotCollateral*( + market: OnChainMarket, requestId: RequestId, slotIndex: uint64 +): Future[?!UInt256] {.async: (raises: [CancelledError]).} = + let slotid = slotId(requestId, slotIndex) + + try: + let slotState = await market.slotState(slotid) + + without request =? await market.getRequest(requestId): + return failure newException( + MarketError, "Failure calculating the slotCollateral, cannot get the request" + ) + + return market.slotCollateral(request.ask.collateralPerSlot, slotState) + except MarketError as error: + error "Error when trying to calculate the slotCollateral", error = error.msg + return failure error + +method slotCollateral*( + market: OnChainMarket, collateralPerSlot: UInt256, slotState: SlotState +): ?!UInt256 {.raises: [].} = + if slotState == SlotState.Repair: + without repairRewardPercentage =? + market.configuration .? collateral .? repairRewardPercentage: + return failure newException( + MarketError, + "Failure calculating the slotCollateral, cannot get the reward percentage", + ) + + return success ( + collateralPerSlot - (collateralPerSlot * repairRewardPercentage.u256).div( + 100.u256 + ) + ) + + return success(collateralPerSlot) diff --git a/codex/market.nim b/codex/market.nim index 5417c8e1..c5177aeb 100644 --- a/codex/market.nim +++ b/codex/market.nim @@ -62,25 +62,40 @@ type ProofSubmitted* = object of MarketplaceEvent id*: SlotId -method getZkeyHash*(market: Market): Future[?string] {.base, async.} = +method loadConfig*( + market: Market +): Future[?!void] {.base, async: (raises: [CancelledError]).} = + raiseAssert("not implemented") + +method getZkeyHash*( + market: Market +): Future[?string] {.base, async: (raises: [CancelledError, MarketError]).} = raiseAssert("not implemented") method getSigner*(market: Market): Future[Address] {.base, async.} = raiseAssert("not implemented") -method periodicity*(market: Market): Future[Periodicity] {.base, async.} = +method periodicity*( + market: Market +): Future[Periodicity] {.base, async: (raises: [CancelledError, MarketError]).} = raiseAssert("not implemented") -method proofTimeout*(market: Market): Future[uint64] {.base, async.} = +method proofTimeout*( + market: Market +): Future[uint64] {.base, async: (raises: [CancelledError, MarketError]).} = raiseAssert("not implemented") -method repairRewardPercentage*(market: Market): Future[uint8] {.base, async.} = +method repairRewardPercentage*( + market: Market +): Future[uint8] {.base, async: (raises: [CancelledError, MarketError]).} = raiseAssert("not implemented") method requestDurationLimit*(market: Market): Future[uint64] {.base, async.} = raiseAssert("not implemented") -method proofDowntime*(market: Market): Future[uint8] {.base, async.} = +method proofDowntime*( + market: Market +): Future[uint8] {.base, async: (raises: [CancelledError, MarketError]).} = raiseAssert("not implemented") method getPointer*(market: Market, slotId: SlotId): Future[uint8] {.base, async.} = @@ -102,7 +117,7 @@ method mySlots*(market: Market): Future[seq[SlotId]] {.base, async.} = method getRequest*( market: Market, id: RequestId -): Future[?StorageRequest] {.base, async.} = +): Future[?StorageRequest] {.base, async: (raises: [CancelledError]).} = raiseAssert("not implemented") method requestState*( @@ -110,7 +125,9 @@ method requestState*( ): Future[?RequestState] {.base, async.} = raiseAssert("not implemented") -method slotState*(market: Market, slotId: SlotId): Future[SlotState] {.base, async.} = +method slotState*( + market: Market, slotId: SlotId +): Future[SlotState] {.base, async: (raises: [CancelledError, MarketError]).} = raiseAssert("not implemented") method getRequestEnd*( @@ -270,3 +287,13 @@ method queryPastStorageRequestedEvents*( market: Market, blocksAgo: int ): Future[seq[StorageRequested]] {.base, async.} = raiseAssert("not implemented") + +method slotCollateral*( + market: Market, requestId: RequestId, slotIndex: uint64 +): Future[?!UInt256] {.base, async: (raises: [CancelledError]).} = + raiseAssert("not implemented") + +method slotCollateral*( + market: Market, collateralPerSlot: UInt256, slotState: SlotState +): ?!UInt256 {.base, gcsafe, raises: [].} = + raiseAssert("not implemented") diff --git a/codex/node.nim b/codex/node.nim index b0f66c90..b248e6df 100644 --- a/codex/node.nim +++ b/codex/node.nim @@ -591,7 +591,11 @@ proc requestStorage*( success purchase.id proc onStore( - self: CodexNodeRef, request: StorageRequest, slotIdx: uint64, blocksCb: BlocksCb + self: CodexNodeRef, + request: StorageRequest, + slotIdx: uint64, + blocksCb: BlocksCb, + isRepairing: bool = false, ): Future[?!void] {.async.} = ## store data in local storage ## @@ -604,6 +608,10 @@ proc onStore( trace "Received a request to store a slot" + # TODO: Use the isRepairing to manage the slot download. + # If isRepairing is true, the slot has to be repaired before + # being downloaded. + without manifest =? (await self.fetchManifest(cid)), err: trace "Unable to fetch manifest for cid", cid, err = err.msg return failure(err) @@ -745,9 +753,12 @@ proc start*(self: CodexNodeRef) {.async.} = if hostContracts =? self.contracts.host: hostContracts.sales.onStore = proc( - request: StorageRequest, slot: uint64, onBatch: BatchProc + request: StorageRequest, + slot: uint64, + onBatch: BatchProc, + isRepairing: bool = false, ): Future[?!void] = - self.onStore(request, slot, onBatch) + self.onStore(request, slot, onBatch, isRepairing) hostContracts.sales.onExpiryUpdate = proc( rootCid: Cid, expiry: SecondsSince1970 diff --git a/codex/sales.nim b/codex/sales.nim index 91d882b8..af594a9a 100644 --- a/codex/sales.nim +++ b/codex/sales.nim @@ -157,13 +157,28 @@ proc cleanUp( # Re-add items back into the queue to prevent small availabilities from # draining the queue. Seen items will be ordered last. if reprocessSlot and request =? data.request: - let queue = sales.context.slotQueue - var seenItem = SlotQueueItem.init( - data.requestId, data.slotIndex.uint16, data.ask, request.expiry, seen = true - ) - trace "pushing ignored item to queue, marked as seen" - if err =? queue.push(seenItem).errorOption: - error "failed to readd slot to queue", errorType = $(type err), error = err.msg + try: + without collateral =? + await sales.context.market.slotCollateral(data.requestId, data.slotIndex), err: + error "Failed to re-add item back to the slot queue: unable to calculate collateral", + error = err.msg + return + + let queue = sales.context.slotQueue + var seenItem = SlotQueueItem.init( + data.requestId, + data.slotIndex.uint16, + data.ask, + request.expiry, + seen = true, + collateral = collateral, + ) + trace "pushing ignored item to queue, marked as seen" + if err =? queue.push(seenItem).errorOption: + error "failed to readd slot to queue", errorType = $(type err), error = err.msg + except MarketError as e: + error "Failed to re-add item back to the slot queue.", error = e.msg + return await sales.remove(agent) @@ -283,7 +298,7 @@ proc onAvailabilityAdded(sales: Sales, availability: Availability) {.async.} = proc onStorageRequested( sales: Sales, requestId: RequestId, ask: StorageAsk, expiry: uint64 -) = +) {.raises: [].} = logScope: topics = "marketplace sales onStorageRequested" requestId @@ -294,7 +309,14 @@ proc onStorageRequested( trace "storage requested, adding slots to queue" - without items =? SlotQueueItem.init(requestId, ask, expiry).catch, err: + let market = sales.context.market + + without collateral =? market.slotCollateral(ask.collateralPerSlot, SlotState.Free), + err: + error "Request failure, unable to calculate collateral", error = err.msg + return + + without items =? SlotQueueItem.init(requestId, ask, expiry, collateral).catch, err: if err of SlotsOutOfRangeError: warn "Too many slots, cannot add to queue" else: @@ -319,35 +341,45 @@ proc onSlotFreed(sales: Sales, requestId: RequestId, slotIndex: uint64) = trace "slot freed, adding to queue" - proc addSlotToQueue() {.async: (raises: []).} = + proc addSlotToQueue() {.async: (raises: [CancelledError]).} = let context = sales.context let market = context.market let queue = context.slotQueue + without request =? (await market.getRequest(requestId)), err: + error "unknown request in contract", error = err.msgDetail + return + + # Take the repairing state into consideration to calculate the collateral. + # This is particularly needed because it will affect the priority in the queue + # and we want to give the user the ability to tweak the parameters. + # Adding the repairing state directly in the queue priority calculation + # would not allow this flexibility. + without collateral =? + market.slotCollateral(request.ask.collateralPerSlot, SlotState.Repair), err: + error "Failed to add freed slot to queue: unable to calculate collateral", + error = err.msg + return + if slotIndex > uint16.high.uint64: error "Cannot cast slot index to uint16, value = ", slotIndex return - # first attempt to populate request using existing metadata in queue - without var found =? queue.populateItem(requestId, slotIndex.uint16): - trace "no existing request metadata, getting request info from contract" - # if there's no existing slot for that request, retrieve the request - # from the contract. - try: - without request =? await market.getRequest(requestId): - error "unknown request in contract" - return + without slotQueueItem =? + SlotQueueItem.init(request, slotIndex.uint16, collateral = collateral).catch, err: + warn "Too many slots, cannot add to queue", error = err.msgDetail + return - found = SlotQueueItem.init(request, slotIndex.uint16) - except CancelledError: - discard # do not propagate as addSlotToQueue was asyncSpawned - except CatchableError as e: - error "failed to get request from contract and add slots to queue", - error = e.msgDetail - - if err =? queue.push(found).errorOption: - error "failed to push slot items to queue", error = err.msgDetail + if err =? queue.push(slotQueueItem).errorOption: + if err of SlotQueueItemExistsError: + error "Failed to push item to queue becaue it already exists", + error = err.msgDetail + elif err of QueueNotRunningError: + warn "Failed to push item to queue becaue queue is not running", + error = err.msgDetail + # We could get rid of this by adding the storage ask in the SlotFreed event, + # so we would not need to call getRequest to get the collateralPerSlot. let fut = addSlotToQueue() sales.trackedFutures.track(fut) asyncSpawn fut @@ -356,7 +388,9 @@ proc subscribeRequested(sales: Sales) {.async.} = let context = sales.context let market = context.market - proc onStorageRequested(requestId: RequestId, ask: StorageAsk, expiry: uint64) = + proc onStorageRequested( + requestId: RequestId, ask: StorageAsk, expiry: uint64 + ) {.raises: [].} = sales.onStorageRequested(requestId, ask, expiry) try: diff --git a/codex/sales/salescontext.nim b/codex/sales/salescontext.nim index 6e6a3568..af940a4b 100644 --- a/codex/sales/salescontext.nim +++ b/codex/sales/salescontext.nim @@ -26,7 +26,7 @@ type BlocksCb* = proc(blocks: seq[bt.Block]): Future[?!void] {.gcsafe, raises: [].} OnStore* = proc( - request: StorageRequest, slot: uint64, blocksCb: BlocksCb + request: StorageRequest, slot: uint64, blocksCb: BlocksCb, isRepairing: bool ): Future[?!void] {.gcsafe, upraises: [].} OnProve* = proc(slot: Slot, challenge: ProofChallenge): Future[?!Groth16Proof] {. gcsafe, upraises: [] diff --git a/codex/sales/slotqueue.nim b/codex/sales/slotqueue.nim index a032d46b..fa57a983 100644 --- a/codex/sales/slotqueue.nim +++ b/codex/sales/slotqueue.nim @@ -34,7 +34,7 @@ type slotSize: uint64 duration: uint64 pricePerBytePerSecond: UInt256 - collateralPerByte: UInt256 + collateral: UInt256 # Collateral computed expiry: uint64 seen: bool @@ -76,9 +76,6 @@ proc profitability(item: SlotQueueItem): UInt256 = slotSize: item.slotSize, ).pricePerSlot -proc collateralPerSlot(item: SlotQueueItem): UInt256 = - StorageAsk(collateralPerByte: item.collateralPerByte, slotSize: item.slotSize).collateralPerSlot - proc `<`*(a, b: SlotQueueItem): bool = # for A to have a higher priority than B (in a min queue), A must be less than # B. @@ -95,8 +92,8 @@ proc `<`*(a, b: SlotQueueItem): bool = scoreA.addIf(a.profitability > b.profitability, 3) scoreB.addIf(a.profitability < b.profitability, 3) - scoreA.addIf(a.collateralPerSlot < b.collateralPerSlot, 2) - scoreB.addIf(a.collateralPerSlot > b.collateralPerSlot, 2) + scoreA.addIf(a.collateral < b.collateral, 2) + scoreB.addIf(a.collateral > b.collateral, 2) scoreA.addIf(a.expiry > b.expiry, 1) scoreB.addIf(a.expiry < b.expiry, 1) @@ -137,6 +134,7 @@ proc init*( slotIndex: uint16, ask: StorageAsk, expiry: uint64, + collateral: UInt256, seen = false, ): SlotQueueItem = SlotQueueItem( @@ -145,25 +143,32 @@ proc init*( slotSize: ask.slotSize, duration: ask.duration, pricePerBytePerSecond: ask.pricePerBytePerSecond, - collateralPerByte: ask.collateralPerByte, + collateral: collateral, expiry: expiry, seen: seen, ) proc init*( - _: type SlotQueueItem, request: StorageRequest, slotIndex: uint16 + _: type SlotQueueItem, + request: StorageRequest, + slotIndex: uint16, + collateral: UInt256, ): SlotQueueItem = - SlotQueueItem.init(request.id, slotIndex, request.ask, request.expiry) + SlotQueueItem.init(request.id, slotIndex, request.ask, request.expiry, collateral) proc init*( - _: type SlotQueueItem, requestId: RequestId, ask: StorageAsk, expiry: uint64 -): seq[SlotQueueItem] = + _: type SlotQueueItem, + requestId: RequestId, + ask: StorageAsk, + expiry: uint64, + collateral: UInt256, +): seq[SlotQueueItem] {.raises: [SlotsOutOfRangeError].} = if not ask.slots.inRange: raise newException(SlotsOutOfRangeError, "Too many slots") var i = 0'u16 proc initSlotQueueItem(): SlotQueueItem = - let item = SlotQueueItem.init(requestId, i, ask, expiry) + let item = SlotQueueItem.init(requestId, i, ask, expiry, collateral) inc i return item @@ -171,8 +176,10 @@ proc init*( Rng.instance.shuffle(items) return items -proc init*(_: type SlotQueueItem, request: StorageRequest): seq[SlotQueueItem] = - return SlotQueueItem.init(request.id, request.ask, request.expiry) +proc init*( + _: type SlotQueueItem, request: StorageRequest, collateral: UInt256 +): seq[SlotQueueItem] = + return SlotQueueItem.init(request.id, request.ask, request.expiry, collateral) proc inRange*(val: SomeUnsignedInt): bool = val.uint16 in SlotQueueSize.low .. SlotQueueSize.high @@ -234,25 +241,7 @@ proc unpause*(self: SlotQueue) = # set unpaused flag to true -- unblocks coroutines waiting on unpaused.wait() self.unpaused.fire() -proc populateItem*( - self: SlotQueue, requestId: RequestId, slotIndex: uint16 -): ?SlotQueueItem = - trace "populate item, items in queue", len = self.queue.len - for item in self.queue.items: - trace "populate item search", itemRequestId = item.requestId, requestId - if item.requestId == requestId: - return some SlotQueueItem( - requestId: requestId, - slotIndex: slotIndex, - slotSize: item.slotSize, - duration: item.duration, - pricePerBytePerSecond: item.pricePerBytePerSecond, - collateralPerByte: item.collateralPerByte, - expiry: item.expiry, - ) - return none SlotQueueItem - -proc push*(self: SlotQueue, item: SlotQueueItem): ?!void = +proc push*(self: SlotQueue, item: SlotQueueItem): ?!void {.raises: [].} = logScope: requestId = item.requestId slotIndex = item.slotIndex diff --git a/codex/sales/states/downloading.nim b/codex/sales/states/downloading.nim index 39137545..7cf304d3 100644 --- a/codex/sales/states/downloading.nim +++ b/codex/sales/states/downloading.nim @@ -67,8 +67,11 @@ method run*( return await reservations.release(reservation.id, reservation.availabilityId, bytes) try: + let slotId = slotId(request.id, data.slotIndex) + let isRepairing = (await context.market.slotState(slotId)) == SlotState.Repair + trace "Starting download" - if err =? (await onStore(request, data.slotIndex, onBlocks)).errorOption: + if err =? (await onStore(request, data.slotIndex, onBlocks, isRepairing)).errorOption: return some State(SaleErrored(error: err, reprocessSlot: false)) trace "Download complete" diff --git a/codex/sales/states/filling.nim b/codex/sales/states/filling.nim index 0c20a64e..03e2ef2b 100644 --- a/codex/sales/states/filling.nim +++ b/codex/sales/states/filling.nim @@ -38,18 +38,11 @@ method run*( slotIndex = data.slotIndex try: - let slotState = await market.slotState(slotId(data.requestId, data.slotIndex)) - let requestedCollateral = request.ask.collateralPerSlot - var collateral: UInt256 - - if slotState == SlotState.Repair: - # When repairing the node gets "discount" on the collateral that it needs to - let repairRewardPercentage = (await market.repairRewardPercentage).u256 - collateral = - requestedCollateral - - ((requestedCollateral * repairRewardPercentage)).div(100.u256) - else: - collateral = requestedCollateral + without collateral =? await market.slotCollateral(data.requestId, data.slotIndex), + err: + error "Failure attempting to fill slot: unable to calculate collateral", + error = err.msg + return debug "Filling slot" try: diff --git a/tests/codex/helpers/mockmarket.nim b/tests/codex/helpers/mockmarket.nim index 48b20f28..16806cb2 100644 --- a/tests/codex/helpers/mockmarket.nim +++ b/tests/codex/helpers/mockmarket.nim @@ -138,22 +138,35 @@ proc new*(_: type MockMarket, clock: ?Clock = Clock.none): MockMarket = signer: Address.example, config: config, canReserveSlot: true, clock: clock ) +method loadConfig*( + market: MockMarket +): Future[?!void] {.async: (raises: [CancelledError]).} = + discard + method getSigner*(market: MockMarket): Future[Address] {.async.} = return market.signer -method periodicity*(mock: MockMarket): Future[Periodicity] {.async.} = +method periodicity*( + mock: MockMarket +): Future[Periodicity] {.async: (raises: [CancelledError, MarketError]).} = return Periodicity(seconds: mock.config.proofs.period) -method proofTimeout*(market: MockMarket): Future[uint64] {.async.} = +method proofTimeout*( + market: MockMarket +): Future[uint64] {.async: (raises: [CancelledError, MarketError]).} = return market.config.proofs.timeout method requestDurationLimit*(market: MockMarket): Future[uint64] {.async.} = return market.config.requestDurationLimit -method proofDowntime*(market: MockMarket): Future[uint8] {.async.} = +method proofDowntime*( + market: MockMarket +): Future[uint8] {.async: (raises: [CancelledError, MarketError]).} = return market.config.proofs.downtime -method repairRewardPercentage*(market: MockMarket): Future[uint8] {.async.} = +method repairRewardPercentage*( + market: MockMarket +): Future[uint8] {.async: (raises: [CancelledError, MarketError]).} = return market.config.collateral.repairRewardPercentage method getPointer*(market: MockMarket, slotId: SlotId): Future[uint8] {.async.} = @@ -173,7 +186,7 @@ method mySlots*(market: MockMarket): Future[seq[SlotId]] {.async.} = method getRequest*( market: MockMarket, id: RequestId -): Future[?StorageRequest] {.async.} = +): Future[?StorageRequest] {.async: (raises: [CancelledError]).} = for request in market.requested: if request.id == id: return some request @@ -191,10 +204,16 @@ method requestState*( ): Future[?RequestState] {.async.} = return market.requestState .? [requestId] -method slotState*(market: MockMarket, slotId: SlotId): Future[SlotState] {.async.} = - if not market.slotState.hasKey(slotId): +method slotState*( + market: MockMarket, slotId: SlotId +): Future[SlotState] {.async: (raises: [CancelledError, MarketError]).} = + if slotId notin market.slotState: return SlotState.Free - return market.slotState[slotId] + + try: + return market.slotState[slotId] + except KeyError as e: + raiseAssert "SlotId not found in known slots (MockMarket.slotState)" method getRequestEnd*( market: MockMarket, id: RequestId @@ -534,3 +553,33 @@ method unsubscribe*(subscription: ProofSubmittedSubscription) {.async.} = method unsubscribe*(subscription: SlotReservationsFullSubscription) {.async.} = subscription.market.subscriptions.onSlotReservationsFull.keepItIf(it != subscription) + +method slotCollateral*( + market: MockMarket, requestId: RequestId, slotIndex: uint64 +): Future[?!UInt256] {.async: (raises: [CancelledError]).} = + let slotid = slotId(requestId, slotIndex) + + try: + let state = await slotState(market, slotid) + + without request =? await market.getRequest(requestId): + return failure newException( + MarketError, "Failure calculating the slotCollateral, cannot get the request" + ) + + return market.slotCollateral(request.ask.collateralPerSlot, state) + except MarketError as error: + error "Error when trying to calculate the slotCollateral", error = error.msg + return failure error + +method slotCollateral*( + market: MockMarket, collateralPerSlot: UInt256, slotState: SlotState +): ?!UInt256 {.raises: [].} = + if slotState == SlotState.Repair: + let repairRewardPercentage = market.config.collateral.repairRewardPercentage.u256 + + return success ( + collateralPerSlot - (collateralPerSlot * repairRewardPercentage).div(100.u256) + ) + + return success collateralPerSlot diff --git a/tests/codex/helpers/mockslotqueueitem.nim b/tests/codex/helpers/mockslotqueueitem.nim index 7a1505ec..8657850f 100644 --- a/tests/codex/helpers/mockslotqueueitem.nim +++ b/tests/codex/helpers/mockslotqueueitem.nim @@ -7,7 +7,7 @@ type MockSlotQueueItem* = object slotSize*: uint64 duration*: uint64 pricePerBytePerSecond*: UInt256 - collateralPerByte*: UInt256 + collateral*: UInt256 expiry*: uint64 seen*: bool @@ -19,8 +19,8 @@ proc toSlotQueueItem*(item: MockSlotQueueItem): SlotQueueItem = slotSize: item.slotSize, duration: item.duration, pricePerBytePerSecond: item.pricePerBytePerSecond, - collateralPerByte: item.collateralPerByte, ), expiry = item.expiry, seen = item.seen, + collateral = item.collateral, ) diff --git a/tests/codex/node/testcontracts.nim b/tests/codex/node/testcontracts.nim index 11f4f273..73dd8daf 100644 --- a/tests/codex/node/testcontracts.nim +++ b/tests/codex/node/testcontracts.nim @@ -125,7 +125,7 @@ asyncchecksuite "Test Node - Host contracts": fetchedBytes += blk.data.len.uint return success() - (await onStore(request, 1.uint64, onBlocks)).tryGet() + (await onStore(request, 1.uint64, onBlocks, isRepairing = false)).tryGet() check fetchedBytes == 12 * DefaultBlockSize.uint let indexer = verifiable.protectedStrategy.init( diff --git a/tests/codex/sales/testsales.nim b/tests/codex/sales/testsales.nim index f078cbee..e92f9607 100644 --- a/tests/codex/sales/testsales.nim +++ b/tests/codex/sales/testsales.nim @@ -62,7 +62,7 @@ asyncchecksuite "Sales - start": sales = Sales.new(market, clock, repo) reservations = sales.context.reservations sales.onStore = proc( - request: StorageRequest, slot: uint64, onBatch: BatchProc + request: StorageRequest, slot: uint64, onBatch: BatchProc, isRepairing = false ): Future[?!void] {.async.} = return success() @@ -181,7 +181,7 @@ asyncchecksuite "Sales": sales = Sales.new(market, clock, repo) reservations = sales.context.reservations sales.onStore = proc( - request: StorageRequest, slot: uint64, onBatch: BatchProc + request: StorageRequest, slot: uint64, onBatch: BatchProc, isRepairing = false ): Future[?!void] {.async.} = return success() @@ -229,7 +229,7 @@ asyncchecksuite "Sales": availability = a.get # update id proc notProcessed(itemsProcessed: seq[SlotQueueItem], request: StorageRequest): bool = - let items = SlotQueueItem.init(request) + let items = SlotQueueItem.init(request, collateral = request.ask.collateralPerSlot) for i in 0 ..< items.len: if itemsProcessed.contains(items[i]): return false @@ -266,7 +266,7 @@ asyncchecksuite "Sales": done.complete() createAvailability() await market.requestStorage(request) - let items = SlotQueueItem.init(request) + let items = SlotQueueItem.init(request, collateral = request.ask.collateralPerSlot) check eventually items.allIt(itemsProcessed.contains(it)) test "removes slots from slot queue once RequestCancelled emitted": @@ -287,13 +287,15 @@ asyncchecksuite "Sales": test "removes slot index from slot queue once SlotFilled emitted": let request1 = await addRequestToSaturatedQueue() market.emitSlotFilled(request1.id, 1.uint64) - let expected = SlotQueueItem.init(request1, 1'u16) + let expected = + SlotQueueItem.init(request1, 1'u16, collateral = request1.ask.collateralPerSlot) check always (not itemsProcessed.contains(expected)) test "removes slot index from slot queue once SlotReservationsFull emitted": let request1 = await addRequestToSaturatedQueue() market.emitSlotReservationsFull(request1.id, 1.uint64) - let expected = SlotQueueItem.init(request1, 1'u16) + let expected = + SlotQueueItem.init(request1, 1'u16, collateral = request1.ask.collateralPerSlot) check always (not itemsProcessed.contains(expected)) test "adds slot index to slot queue once SlotFreed emitted": @@ -303,14 +305,21 @@ asyncchecksuite "Sales": createAvailability() market.requested.add request # "contract" must be able to return request + market.emitSlotFreed(request.id, 2.uint64) - let expected = SlotQueueItem.init(request, 2.uint16) + without collateralPerSlot =? await market.slotCollateral(request.id, 2.uint64), + error: + fail() + + let expected = + SlotQueueItem.init(request, 2.uint16, collateral = request.ask.collateralPerSlot) + check eventually itemsProcessed.contains(expected) test "items in queue are readded (and marked seen) once ignored": await market.requestStorage(request) - let items = SlotQueueItem.init(request) + let items = SlotQueueItem.init(request, collateral = request.ask.collateralPerSlot) check eventually queue.len > 0 # queue starts paused, allow items to be added to the queue check eventually queue.paused @@ -331,7 +340,7 @@ asyncchecksuite "Sales": test "queue is paused once availability is insufficient to service slots in queue": createAvailability() # enough to fill a single slot await market.requestStorage(request) - let items = SlotQueueItem.init(request) + let items = SlotQueueItem.init(request, collateral = request.ask.collateralPerSlot) check eventually queue.len > 0 # queue starts paused, allow items to be added to the queue check eventually queue.paused @@ -348,7 +357,7 @@ asyncchecksuite "Sales": test "availability size is reduced by request slot size when fully downloaded": sales.onStore = proc( - request: StorageRequest, slot: uint64, onBatch: BatchProc + request: StorageRequest, slot: uint64, onBatch: BatchProc, isRepairing = false ): Future[?!void] {.async.} = let blk = bt.Block.new(@[1.byte]).get await onBatch(blk.repeat(request.ask.slotSize.int)) @@ -361,7 +370,7 @@ asyncchecksuite "Sales": test "non-downloaded bytes are returned to availability once finished": var slotIndex = 0.uint64 sales.onStore = proc( - request: StorageRequest, slot: uint64, onBatch: BatchProc + request: StorageRequest, slot: uint64, onBatch: BatchProc, isRepairing = false ): Future[?!void] {.async.} = slotIndex = slot let blk = bt.Block.new(@[1.byte]).get @@ -421,7 +430,7 @@ asyncchecksuite "Sales": var storingRequest: StorageRequest var storingSlot: uint64 sales.onStore = proc( - request: StorageRequest, slot: uint64, onBatch: BatchProc + request: StorageRequest, slot: uint64, onBatch: BatchProc, isRepairing = false ): Future[?!void] {.async.} = storingRequest = request storingSlot = slot @@ -434,7 +443,7 @@ asyncchecksuite "Sales": test "makes storage available again when data retrieval fails": let error = newException(IOError, "data retrieval failed") sales.onStore = proc( - request: StorageRequest, slot: uint64, onBatch: BatchProc + request: StorageRequest, slot: uint64, onBatch: BatchProc, isRepairing = false ): Future[?!void] {.async.} = return failure(error) createAvailability() @@ -503,7 +512,7 @@ asyncchecksuite "Sales": test "makes storage available again when other host fills the slot": let otherHost = Address.example sales.onStore = proc( - request: StorageRequest, slot: uint64, onBatch: BatchProc + request: StorageRequest, slot: uint64, onBatch: BatchProc, isRepairing = false ): Future[?!void] {.async.} = await sleepAsync(chronos.hours(1)) return success() @@ -519,7 +528,7 @@ asyncchecksuite "Sales": let origSize = availability.freeSize sales.onStore = proc( - request: StorageRequest, slot: uint64, onBatch: BatchProc + request: StorageRequest, slot: uint64, onBatch: BatchProc, isRepairing = false ): Future[?!void] {.async.} = await sleepAsync(chronos.hours(1)) return success() @@ -544,7 +553,7 @@ asyncchecksuite "Sales": let origSize = availability.freeSize sales.onStore = proc( - request: StorageRequest, slot: uint64, onBatch: BatchProc + request: StorageRequest, slot: uint64, onBatch: BatchProc, isRepairing = false ): Future[?!void] {.async.} = await sleepAsync(chronos.hours(1)) return success() diff --git a/tests/codex/sales/testslotqueue.nim b/tests/codex/sales/testslotqueue.nim index 46c35b1c..03c658be 100644 --- a/tests/codex/sales/testslotqueue.nim +++ b/tests/codex/sales/testslotqueue.nim @@ -159,8 +159,10 @@ suite "Slot queue": requestB.ask.collateralPerByte = 1.u256 requestB.expiry = 1000.uint64 - let itemA = SlotQueueItem.init(requestA, 0) - let itemB = SlotQueueItem.init(requestB, 0) + let itemA = + SlotQueueItem.init(requestA, 0, collateral = requestA.ask.collateralPerSlot) + let itemB = + SlotQueueItem.init(requestB, 0, collateral = requestB.ask.collateralPerSlot) check itemB < itemA # B higher priority than A check itemA > itemB @@ -172,7 +174,7 @@ suite "Slot queue": slotSize: 1.uint64, duration: 1.uint64, pricePerBytePerSecond: 2.u256, # profitability is higher (good) - collateralPerByte: 1.u256, + collateral: 1.u256, expiry: 1.uint64, seen: true, # seen (bad), more weight than profitability ) @@ -182,7 +184,7 @@ suite "Slot queue": slotSize: 1.uint64, duration: 1.uint64, pricePerBytePerSecond: 1.u256, # profitability is lower (bad) - collateralPerByte: 1.u256, + collateral: 1.u256, expiry: 1.uint64, seen: false, # not seen (good) ) @@ -197,7 +199,7 @@ suite "Slot queue": slotSize: 1.uint64, duration: 1.uint64, pricePerBytePerSecond: 1.u256, # reward is lower (bad) - collateralPerByte: 1.u256, # collateral is lower (good) + collateral: 1.u256, # collateral is lower (good) expiry: 1.uint64, seen: false, ) @@ -208,7 +210,7 @@ suite "Slot queue": duration: 1.uint64, pricePerBytePerSecond: 2.u256, # reward is higher (good), more weight than collateral - collateralPerByte: 2.u256, # collateral is higher (bad) + collateral: 2.u256, # collateral is higher (bad) expiry: 1.uint64, seen: false, ) @@ -223,7 +225,7 @@ suite "Slot queue": slotSize: 1.uint64, duration: 1.uint64, pricePerBytePerSecond: 1.u256, - collateralPerByte: 2.u256, # collateral is higher (bad) + collateral: 2.u256, # collateral is higher (bad) expiry: 2.uint64, # expiry is longer (good) seen: false, ) @@ -233,7 +235,7 @@ suite "Slot queue": slotSize: 1.uint64, duration: 1.uint64, pricePerBytePerSecond: 1.u256, - collateralPerByte: 1.u256, # collateral is lower (good), more weight than expiry + collateral: 1.u256, # collateral is lower (good), more weight than expiry expiry: 1.uint64, # expiry is shorter (bad) seen: false, ) @@ -248,7 +250,7 @@ suite "Slot queue": slotSize: 1.uint64, # slotSize is smaller (good) duration: 1.uint64, pricePerBytePerSecond: 1.u256, - collateralPerByte: 1.u256, + collateral: 1.u256, expiry: 1.uint64, # expiry is shorter (bad) seen: false, ) @@ -258,7 +260,7 @@ suite "Slot queue": slotSize: 2.uint64, # slotSize is larger (bad) duration: 1.uint64, pricePerBytePerSecond: 1.u256, - collateralPerByte: 1.u256, + collateral: 1.u256, expiry: 2.uint64, # expiry is longer (good), more weight than slotSize seen: false, ) @@ -273,7 +275,7 @@ suite "Slot queue": slotSize: 2.uint64, # slotSize is larger (bad) duration: 1.uint64, pricePerBytePerSecond: 1.u256, - collateralPerByte: 1.u256, + collateral: 1.u256, expiry: 1.uint64, # expiry is shorter (bad) seen: false, ) @@ -283,7 +285,7 @@ suite "Slot queue": slotSize: 1.uint64, # slotSize is smaller (good) duration: 1.uint64, pricePerBytePerSecond: 1.u256, - collateralPerByte: 1.u256, + collateral: 1.u256, expiry: 1.uint64, seen: false, ) @@ -292,11 +294,16 @@ suite "Slot queue": test "expands available all possible slot indices on init": let request = StorageRequest.example - let items = SlotQueueItem.init(request) + let items = SlotQueueItem.init(request, collateral = request.ask.collateralPerSlot) check items.len.uint64 == request.ask.slots var checked = 0 for slotIndex in 0'u16 ..< request.ask.slots.uint16: - check items.anyIt(it == SlotQueueItem.init(request, slotIndex)) + check items.anyIt( + it == + SlotQueueItem.init( + request, slotIndex, collateral = request.ask.collateralPerSlot + ) + ) inc checked check checked == items.len @@ -322,34 +329,17 @@ suite "Slot queue": check isOk queue.push(item3) check isOk queue.push(item4) - test "populates item with exisiting request metadata": - newSlotQueue(maxSize = 8, maxWorkers = 1, processSlotDelay = 10.millis) - let request0 = StorageRequest.example - var request1 = StorageRequest.example - request1.ask.collateralPerByte += 1.u256 - let items0 = SlotQueueItem.init(request0) - let items1 = SlotQueueItem.init(request1) - check queue.push(items0).isOk - check queue.push(items1).isOk - let populated = !queue.populateItem(request1.id, 12'u16) - check populated.requestId == request1.id - check populated.slotIndex == 12'u16 - check populated.slotSize == request1.ask.slotSize - check populated.duration == request1.ask.duration - check populated.pricePerBytePerSecond == request1.ask.pricePerBytePerSecond - check populated.collateralPerByte == request1.ask.collateralPerByte - - test "does not find exisiting request metadata": - newSlotQueue(maxSize = 2, maxWorkers = 2) - let item = SlotQueueItem.example - check queue.populateItem(item.requestId, 12'u16).isNone - test "can support uint16.high slots": var request = StorageRequest.example let maxUInt16 = uint16.high let uint64Slots = uint64(maxUInt16) request.ask.slots = uint64Slots - let items = SlotQueueItem.init(request.id, request.ask, request.expiry) + let items = SlotQueueItem.init( + request.id, + request.ask, + request.expiry, + collateral = request.ask.collateralPerSlot, + ) check items.len.uint16 == maxUInt16 test "cannot support greater than uint16.high slots": @@ -358,7 +348,12 @@ suite "Slot queue": let uint64Slots = uint64(int32Slots) request.ask.slots = uint64Slots expect SlotsOutOfRangeError: - discard SlotQueueItem.init(request.id, request.ask, request.expiry) + discard SlotQueueItem.init( + request.id, + request.ask, + request.expiry, + collateral = request.ask.collateralPerSlot, + ) test "cannot push duplicate items": newSlotQueue(maxSize = 6, maxWorkers = 1, processSlotDelay = 15.millis) @@ -399,8 +394,10 @@ suite "Slot queue": let request0 = StorageRequest.example var request1 = StorageRequest.example request1.ask.collateralPerByte += 1.u256 - let items0 = SlotQueueItem.init(request0) - let items1 = SlotQueueItem.init(request1) + let items0 = + SlotQueueItem.init(request0, collateral = request0.ask.collateralPerSlot) + let items1 = + SlotQueueItem.init(request1, collateral = request1.ask.collateralPerSlot) check queue.push(items0).isOk check queue.push(items1).isOk let last = items1[items1.high] @@ -413,8 +410,10 @@ suite "Slot queue": let request0 = StorageRequest.example var request1 = StorageRequest.example request1.ask.collateralPerByte += 1.u256 - let items0 = SlotQueueItem.init(request0) - let items1 = SlotQueueItem.init(request1) + let items0 = + SlotQueueItem.init(request0, collateral = request0.ask.collateralPerSlot) + let items1 = + SlotQueueItem.init(request1, collateral = request1.ask.collateralPerSlot) check queue.push(items0).isOk check queue.push(items1).isOk queue.delete(request1.id) @@ -433,42 +432,56 @@ suite "Slot queue": request3.ask.collateralPerByte = request2.ask.collateralPerByte + 1 request4.ask.collateralPerByte = request3.ask.collateralPerByte + 1 request5.ask.collateralPerByte = request4.ask.collateralPerByte + 1 - let item0 = SlotQueueItem.init(request0, 0) - let item1 = SlotQueueItem.init(request1, 0) - let item2 = SlotQueueItem.init(request2, 0) - let item3 = SlotQueueItem.init(request3, 0) - let item4 = SlotQueueItem.init(request4, 0) - let item5 = SlotQueueItem.init(request5, 0) + let item0 = + SlotQueueItem.init(request0, 0, collateral = request0.ask.collateralPerSlot) + let item1 = + SlotQueueItem.init(request1, 0, collateral = request1.ask.collateralPerSlot) + let item2 = + SlotQueueItem.init(request2, 0, collateral = request2.ask.collateralPerSlot) + let item3 = + SlotQueueItem.init(request3, 0, collateral = request3.ask.collateralPerSlot) + let item4 = + SlotQueueItem.init(request4, 0, collateral = request4.ask.collateralPerSlot) + let item5 = + SlotQueueItem.init(request5, 0, collateral = request5.ask.collateralPerSlot) check queue.contains(item5) == false check queue.push(@[item0, item1, item2, item3, item4, item5]).isOk check queue.contains(item5) test "sorts items by profitability descending (higher pricePerBytePerSecond == higher priority == goes first in the list)": var request = StorageRequest.example - let item0 = SlotQueueItem.init(request, 0) + let item0 = + SlotQueueItem.init(request, 0, collateral = request.ask.collateralPerSlot) request.ask.pricePerBytePerSecond += 1.u256 - let item1 = SlotQueueItem.init(request, 1) + let item1 = + SlotQueueItem.init(request, 1, collateral = request.ask.collateralPerSlot) check item1 < item0 - test "sorts items by collateral ascending (higher required collateralPerByte = lower priority == comes later in the list)": + test "sorts items by collateral ascending (higher required collateral = lower priority == comes later in the list)": var request = StorageRequest.example - let item0 = SlotQueueItem.init(request, 0) - request.ask.collateralPerByte += 1.u256 - let item1 = SlotQueueItem.init(request, 1) + let item0 = + SlotQueueItem.init(request, 0, collateral = request.ask.collateralPerSlot) + let item1 = SlotQueueItem.init( + request, 1, collateral = request.ask.collateralPerSlot + 1.u256 + ) check item1 > item0 test "sorts items by expiry descending (longer expiry = higher priority)": var request = StorageRequest.example - let item0 = SlotQueueItem.init(request, 0) + let item0 = + SlotQueueItem.init(request, 0, collateral = request.ask.collateralPerSlot) request.expiry += 1 - let item1 = SlotQueueItem.init(request, 1) + let item1 = + SlotQueueItem.init(request, 1, collateral = request.ask.collateralPerSlot) check item1 < item0 test "sorts items by slot size descending (bigger dataset = higher profitability = higher priority)": var request = StorageRequest.example - let item0 = SlotQueueItem.init(request, 0) + let item0 = + SlotQueueItem.init(request, 0, collateral = request.ask.collateralPerSlot) request.ask.slotSize += 1 - let item1 = SlotQueueItem.init(request, 1) + let item1 = + SlotQueueItem.init(request, 1, collateral = request.ask.collateralPerSlot) check item1 < item0 test "should call callback once an item is added": @@ -489,13 +502,17 @@ suite "Slot queue": # sleeping after push allows the slotqueue loop to iterate, # calling the callback for each pushed/updated item var request = StorageRequest.example - let item0 = SlotQueueItem.init(request, 0) + let item0 = + SlotQueueItem.init(request, 0, collateral = request.ask.collateralPerSlot) request.ask.pricePerBytePerSecond += 1.u256 - let item1 = SlotQueueItem.init(request, 1) + let item1 = + SlotQueueItem.init(request, 1, collateral = request.ask.collateralPerSlot) request.ask.pricePerBytePerSecond += 1.u256 - let item2 = SlotQueueItem.init(request, 2) + let item2 = + SlotQueueItem.init(request, 2, collateral = request.ask.collateralPerSlot) request.ask.pricePerBytePerSecond += 1.u256 - let item3 = SlotQueueItem.init(request, 3) + let item3 = + SlotQueueItem.init(request, 3, collateral = request.ask.collateralPerSlot) check queue.push(item0).isOk await sleepAsync(1.millis) @@ -520,13 +537,17 @@ suite "Slot queue": # sleeping after push allows the slotqueue loop to iterate, # calling the callback for each pushed/updated item var request = StorageRequest.example - let item0 = SlotQueueItem.init(request, 0) + let item0 = + SlotQueueItem.init(request, 0, collateral = request.ask.collateralPerSlot) request.ask.pricePerBytePerSecond += 1.u256 - let item1 = SlotQueueItem.init(request, 1) + let item1 = + SlotQueueItem.init(request, 1, collateral = request.ask.collateralPerSlot) request.ask.pricePerBytePerSecond += 1.u256 - let item2 = SlotQueueItem.init(request, 2) + let item2 = + SlotQueueItem.init(request, 2, collateral = request.ask.collateralPerSlot) request.ask.pricePerBytePerSecond += 1.u256 - let item3 = SlotQueueItem.init(request, 3) + let item3 = + SlotQueueItem.init(request, 3, collateral = request.ask.collateralPerSlot) check queue.push(item0).isOk check queue.push(item1).isOk @@ -550,7 +571,7 @@ suite "Slot queue": queue.pause let request = StorageRequest.example - var items = SlotQueueItem.init(request) + var items = SlotQueueItem.init(request, collateral = request.ask.collateralPerSlot) check queue.push(items).isOk # check all items processed check eventually queue.len == 0 @@ -558,8 +579,14 @@ suite "Slot queue": test "pushing seen item does not unpause queue": newSlotQueue(maxSize = 4, maxWorkers = 4) let request = StorageRequest.example - let item0 = - SlotQueueItem.init(request.id, 0'u16, request.ask, request.expiry, seen = true) + let item0 = SlotQueueItem.init( + request.id, + 0'u16, + request.ask, + request.expiry, + request.ask.collateralPerSlot, + seen = true, + ) check queue.paused check queue.push(item0).isOk check queue.paused @@ -567,8 +594,14 @@ suite "Slot queue": test "paused queue waits for unpause before continuing processing": newSlotQueue(maxSize = 4, maxWorkers = 4) let request = StorageRequest.example - let item = - SlotQueueItem.init(request.id, 1'u16, request.ask, request.expiry, seen = false) + let item = SlotQueueItem.init( + request.id, + 1'u16, + request.ask, + request.expiry, + request.ask.collateralPerSlot, + seen = false, + ) check queue.paused # push causes unpause check queue.push(item).isOk @@ -579,10 +612,22 @@ suite "Slot queue": test "processing a 'seen' item pauses the queue": newSlotQueue(maxSize = 4, maxWorkers = 4) let request = StorageRequest.example - let unseen = - SlotQueueItem.init(request.id, 0'u16, request.ask, request.expiry, seen = false) - let seen = - SlotQueueItem.init(request.id, 1'u16, request.ask, request.expiry, seen = true) + let unseen = SlotQueueItem.init( + request.id, + 0'u16, + request.ask, + request.expiry, + request.ask.collateralPerSlot, + seen = false, + ) + let seen = SlotQueueItem.init( + request.id, + 1'u16, + request.ask, + request.expiry, + request.ask.collateralPerSlot, + seen = true, + ) # push causes unpause check queue.push(unseen).isSuccess # check all items processed @@ -595,10 +640,22 @@ suite "Slot queue": test "processing a 'seen' item does not decrease the number of workers": newSlotQueue(maxSize = 4, maxWorkers = 4) let request = StorageRequest.example - let unseen = - SlotQueueItem.init(request.id, 0'u16, request.ask, request.expiry, seen = false) - let seen = - SlotQueueItem.init(request.id, 1'u16, request.ask, request.expiry, seen = true) + let unseen = SlotQueueItem.init( + request.id, + 0'u16, + request.ask, + request.expiry, + request.ask.collateralPerSlot, + seen = false, + ) + let seen = SlotQueueItem.init( + request.id, + 1'u16, + request.ask, + request.expiry, + request.ask.collateralPerSlot, + seen = true, + ) # push seen item to ensure that queue is pausing check queue.push(seen).isSuccess # unpause and pause a number of times @@ -615,10 +672,22 @@ suite "Slot queue": test "item 'seen' flags can be cleared": newSlotQueue(maxSize = 4, maxWorkers = 1) let request = StorageRequest.example - let item0 = - SlotQueueItem.init(request.id, 0'u16, request.ask, request.expiry, seen = true) - let item1 = - SlotQueueItem.init(request.id, 1'u16, request.ask, request.expiry, seen = true) + let item0 = SlotQueueItem.init( + request.id, + 0'u16, + request.ask, + request.expiry, + request.ask.collateralPerSlot, + seen = true, + ) + let item1 = SlotQueueItem.init( + request.id, + 1'u16, + request.ask, + request.expiry, + request.ask.collateralPerSlot, + seen = true, + ) check queue.push(item0).isOk check queue.push(item1).isOk check queue[0].seen diff --git a/tests/contracts/testMarket.nim b/tests/contracts/testMarket.nim index 74d6a65e..068a4d2e 100644 --- a/tests/contracts/testMarket.nim +++ b/tests/contracts/testMarket.nim @@ -598,6 +598,37 @@ ethersuite "On-Chain Market": check endBalanceHost == (startBalanceHost + request.ask.collateralPerSlot) check endBalanceReward == (startBalanceReward + expectedPayout) + test "returns the collateral when the slot is not being repaired": + await market.requestStorage(request) + await market.reserveSlot(request.id, 0.uint64) + await market.fillSlot(request.id, 0.uint64, proof, request.ask.collateralPerSlot) + + let slotId = request.slotId(0.uint64) + without collateral =? await market.slotCollateral(request.id, 0.uint64), error: + fail() + + check collateral == request.ask.collateralPerSlot + + test "calculates correctly the collateral when the slot is being repaired": + # Ensure that the config is loaded and repairRewardPercentage is available + discard await market.repairRewardPercentage() + + await market.requestStorage(request) + await market.reserveSlot(request.id, 0.uint64) + await market.fillSlot(request.id, 0.uint64, proof, request.ask.collateralPerSlot) + await market.freeSlot(slotId(request.id, 0.uint64)) + + let slotId = request.slotId(0.uint64) + + without collateral =? await market.slotCollateral(request.id, 0.uint64), error: + fail() + + # slotCollateral + # repairRewardPercentage = 10 + # expected collateral = slotCollateral - slotCollateral * 0.1 + check collateral == + request.ask.collateralPerSlot - (request.ask.collateralPerSlot * 10).div(100.u256) + test "the request is added in cache after the fist access": await market.requestStorage(request) diff --git a/tests/examples.nim b/tests/examples.nim index 9b88b4a5..9ef4e292 100644 --- a/tests/examples.nim +++ b/tests/examples.nim @@ -72,7 +72,9 @@ proc example*(_: type Slot): Slot = proc example*(_: type SlotQueueItem): SlotQueueItem = let request = StorageRequest.example let slot = Slot.example - SlotQueueItem.init(request, slot.slotIndex.uint16) + SlotQueueItem.init( + request, slot.slotIndex.uint16, collateral = request.ask.collateralPerSlot + ) proc example(_: type G1Point): G1Point = G1Point(x: UInt256.example, y: UInt256.example) From eb09e610d5e1c649f32877d4d924332677a5fdd4 Mon Sep 17 00:00:00 2001 From: Arnaud Date: Wed, 5 Mar 2025 09:35:46 +0100 Subject: [PATCH 5/7] fix(ci): handle coverage as a string to enable gcc 14 on linux (#1140) * Handle coverage as a string not a boolean * Update ubuntu version to latest --- .github/actions/nimbus-build-system/action.yml | 4 ++-- .github/workflows/nim-matrix.yml | 8 ++++---- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/.github/actions/nimbus-build-system/action.yml b/.github/actions/nimbus-build-system/action.yml index 219966db..5d1917e3 100644 --- a/.github/actions/nimbus-build-system/action.yml +++ b/.github/actions/nimbus-build-system/action.yml @@ -89,7 +89,7 @@ runs: - name: Install gcc 14 on Linux # We don't want to install gcc 14 for coverage (Ubuntu 20.04) - if : ${{ inputs.os == 'linux' && !inputs.coverage }} + if : ${{ inputs.os == 'linux' && inputs.coverage != 'true' }} shell: ${{ inputs.shell }} {0} run: | # Add GCC-14 to alternatives @@ -202,7 +202,7 @@ runs: - name: Restore Nim toolchain binaries from cache id: nim-cache uses: actions/cache@v4 - if : ${{ !inputs.coverage }} + if : ${{ inputs.coverage != 'true' }} with: path: NimBinaries key: ${{ inputs.os }}-${{ inputs.cpu }}-nim-${{ inputs.nim_version }}-cache-${{ env.cache_nonce }}-${{ github.run_id }} diff --git a/.github/workflows/nim-matrix.yml b/.github/workflows/nim-matrix.yml index 4d86d3bb..71129574 100644 --- a/.github/workflows/nim-matrix.yml +++ b/.github/workflows/nim-matrix.yml @@ -20,10 +20,10 @@ jobs: uses: fabiocaccamo/create-matrix-action@v5 with: matrix: | - os {linux}, cpu {amd64}, builder {ubuntu-20.04}, tests {unittest}, nim_version {${{ env.nim_version }}}, shell {bash --noprofile --norc -e -o pipefail} - os {linux}, cpu {amd64}, builder {ubuntu-20.04}, tests {contract}, nim_version {${{ env.nim_version }}}, shell {bash --noprofile --norc -e -o pipefail} - os {linux}, cpu {amd64}, builder {ubuntu-20.04}, tests {integration}, nim_version {${{ env.nim_version }}}, shell {bash --noprofile --norc -e -o pipefail} - os {linux}, cpu {amd64}, builder {ubuntu-20.04}, tests {tools}, nim_version {${{ env.nim_version }}}, shell {bash --noprofile --norc -e -o pipefail} + os {linux}, cpu {amd64}, builder {ubuntu-latest}, tests {unittest}, nim_version {${{ env.nim_version }}}, shell {bash --noprofile --norc -e -o pipefail} + os {linux}, cpu {amd64}, builder {ubuntu-latest}, tests {contract}, nim_version {${{ env.nim_version }}}, shell {bash --noprofile --norc -e -o pipefail} + os {linux}, cpu {amd64}, builder {ubuntu-latest}, tests {integration}, nim_version {${{ env.nim_version }}}, shell {bash --noprofile --norc -e -o pipefail} + os {linux}, cpu {amd64}, builder {ubuntu-latest}, tests {tools}, nim_version {${{ env.nim_version }}}, shell {bash --noprofile --norc -e -o pipefail} build: needs: matrix From 2a3a29720f8c3a6d8cb64c6e463dc3af0cf45c8b Mon Sep 17 00:00:00 2001 From: Giuliano Mega Date: Mon, 10 Mar 2025 10:27:16 -0300 Subject: [PATCH 6/7] Fixes Codex crashes on interrupted downloads (#1151) * fix: fixes Codex crashes on interrupted downloads * fix: add better feedback to 404, minor rewording in test comment --- codex/rest/api.nim | 24 +++++++++++++++++++----- tests/integration/testrestapi.nim | 30 ++++++++++++++++++++++++++++++ 2 files changed, 49 insertions(+), 5 deletions(-) diff --git a/codex/rest/api.nim b/codex/rest/api.nim index 89dbe220..6b8f2ac1 100644 --- a/codex/rest/api.nim +++ b/codex/rest/api.nim @@ -65,9 +65,15 @@ proc formatManifestBlocks(node: CodexNodeRef): Future[JsonNode] {.async.} = return %RestContentList.init(content) +proc isPending(resp: HttpResponseRef): bool = + ## Checks that an HttpResponseRef object is still pending; i.e., + ## that no body has yet been sent. This helps us guard against calling + ## sendBody(resp: HttpResponseRef, ...) twice, which is illegal. + return resp.getResponseState() == HttpResponseState.Empty + proc retrieveCid( node: CodexNodeRef, cid: Cid, local: bool = true, resp: HttpResponseRef -): Future[RestApiResponse] {.async.} = +): Future[void] {.async: (raises: [CancelledError, HttpWriteError]).} = ## Download a file from the node in a streaming ## manner ## @@ -79,16 +85,21 @@ proc retrieveCid( without stream =? (await node.retrieve(cid, local)), error: if error of BlockNotFoundError: resp.status = Http404 - return await resp.sendBody("") + await resp.sendBody( + "The requested CID could not be retrieved (" & error.msg & ")." + ) + return else: resp.status = Http500 - return await resp.sendBody(error.msg) + await resp.sendBody(error.msg) + return # It is ok to fetch again the manifest because it will hit the cache without manifest =? (await node.fetchManifest(cid)), err: error "Failed to fetch manifest", err = err.msg resp.status = Http404 - return await resp.sendBody(err.msg) + await resp.sendBody(err.msg) + return if manifest.mimetype.isSome: resp.setHeader("Content-Type", manifest.mimetype.get()) @@ -119,10 +130,13 @@ proc retrieveCid( await resp.sendChunk(addr buff[0], buff.len) await resp.finish() codex_api_downloads.inc() + except CancelledError as exc: + raise exc except CatchableError as exc: warn "Error streaming blocks", exc = exc.msg resp.status = Http500 - return await resp.sendBody("") + if resp.isPending(): + await resp.sendBody(exc.msg) finally: info "Sent bytes", cid = cid, bytes if not stream.isNil: diff --git a/tests/integration/testrestapi.nim b/tests/integration/testrestapi.nim index 2311dc22..20bf8bc8 100644 --- a/tests/integration/testrestapi.nim +++ b/tests/integration/testrestapi.nim @@ -1,4 +1,6 @@ import std/httpclient +import std/importutils +import std/net import std/sequtils import std/strformat from pkg/libp2p import `==`, `$`, Cid @@ -305,3 +307,31 @@ twonodessuite "REST API": let cid = Manifest.example().makeManifestBlock().get.cid let response = client1.deleteRaw($cid) check response.status == "204 No Content" + + test "should not crash if the download stream is closed before download completes", + twoNodesConfig: + privateAccess(client1.type) + privateAccess(client1.http.type) + + let cid = client1.upload(repeat("some file contents", 1000)).get + + try: + # Sadly, there's no high level API for preventing the client from + # consuming the whole response, and we need to close the socket + # before that happens if we want to trigger the bug, so we need to + # resort to this. + client1.http.getBody = false + let response = client1.downloadRaw($cid) + + # Read 4 bytes from the stream just to make sure we actually + # receive some data. + let data = client1.http.socket.recv(4) + check data.len == 4 + + # Prematurely closes the connection. + client1.http.close() + finally: + client1.http.getBody = true + + let response = client1.downloadRaw($cid) + check response.body == repeat("some file contents", 1000) From 703921df322e2c32b4f12786fc48e30989b025ca Mon Sep 17 00:00:00 2001 From: Arnaud Date: Mon, 10 Mar 2025 16:59:24 +0100 Subject: [PATCH 7/7] chore(restapi): add headers to support on progress when downloading (#1150) * Add headers to support on progress on download * Replace http session by http client in downloadBytes * Use int instead of int64 for datasetSize * Rename variable to avoid shallowing client --- codex/rest/api.nim | 3 +++ tests/integration/codexclient.nim | 12 ++++++------ 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/codex/rest/api.nim b/codex/rest/api.nim index 6b8f2ac1..7cb0b43f 100644 --- a/codex/rest/api.nim +++ b/codex/rest/api.nim @@ -114,6 +114,8 @@ proc retrieveCid( else: resp.setHeader("Content-Disposition", "attachment") + resp.setHeader("Content-Length", $manifest.datasetSize.int) + await resp.prepareChunked() while not stream.atEof: @@ -342,6 +344,7 @@ proc initDataApi(node: CodexNodeRef, repoStore: RepoStore, router: var RestRoute resp.setCorsHeaders("GET", corsOrigin) resp.setHeader("Access-Control-Headers", "X-Requested-With") + resp.setHeader("Access-Control-Expose-Headers", "Content-Disposition") await node.retrieveCid(cid.get(), local = false, resp = resp) router.api(MethodGet, "/api/codex/v1/data/{cid}/network/manifest") do( diff --git a/tests/integration/codexclient.nim b/tests/integration/codexclient.nim index 287f465f..f4c3f977 100644 --- a/tests/integration/codexclient.nim +++ b/tests/integration/codexclient.nim @@ -76,15 +76,15 @@ proc downloadNoStream*(client: CodexClient, cid: Cid): ?!string = proc downloadBytes*( client: CodexClient, cid: Cid, local = false ): Future[?!seq[byte]] {.async.} = - let uri = - parseUri(client.baseurl & "/data/" & $cid & (if local: "" else: "/network/stream")) + let uri = client.baseurl & "/data/" & $cid & (if local: "" else: "/network/stream") - let (status, bytes) = await client.session.fetch(uri) + let httpClient = newHttpClient() + let response = httpClient.get(uri) - if status != 200: - return failure("fetch failed with status " & $status) + if response.status != "200 OK": + return failure("fetch failed with status " & $response.status) - success bytes + success response.body.toBytes proc delete*(client: CodexClient, cid: Cid): ?!void = let