diff --git a/codex/blockexchange/engine/discovery.nim b/codex/blockexchange/engine/discovery.nim index ab1d4cc3..8bb4dd4a 100644 --- a/codex/blockexchange/engine/discovery.nim +++ b/codex/blockexchange/engine/discovery.nim @@ -65,7 +65,7 @@ type proc discoveryQueueLoop(b: DiscoveryEngine) {.async.} = while b.discEngineRunning: - for cid in toSeq(b.pendingBlocks.wantList): + for cid in toSeq(b.pendingBlocks.wantListBlockCids): try: await b.discoveryQueue.put(cid) except CatchableError as exc: diff --git a/codex/blockexchange/engine/engine.nim b/codex/blockexchange/engine/engine.nim index 2ad09bd0..64aa624c 100644 --- a/codex/blockexchange/engine/engine.nim +++ b/codex/blockexchange/engine/engine.nim @@ -11,16 +11,18 @@ import std/sequtils import std/sets import std/options import std/algorithm +import std/sugar import pkg/chronos import pkg/chronicles -import pkg/libp2p/[cid, switch] +import pkg/libp2p/[cid, switch, multihash, multicodec] import pkg/metrics import pkg/stint import ../../stores/blockstore -import ../../blocktype as bt +import ../../blocktype import ../../utils +import ../../merkletree import ../protobuf/blockexc import ../protobuf/presence @@ -77,12 +79,6 @@ type address*: EthAddress price*: UInt256 -proc contains*(a: AsyncHeapQueue[Entry], b: Cid): bool = - ## Convenience method to check for entry prepense - ## - - a.anyIt( it.cid == b ) - # attach task scheduler to engine proc scheduleTask(b: BlockExcEngine, task: BlockExcPeerCtx): bool {.gcsafe} = b.taskQueue.pushOrUpdateNoWait(task).isOk() @@ -124,22 +120,30 @@ proc stop*(b: BlockExcEngine) {.async.} = trace "NetworkStore stopped" -proc sendWantHave(b: BlockExcEngine, cid: Cid, selectedPeer: BlockExcPeerCtx, peers: seq[BlockExcPeerCtx]): Future[void] {.async.} = - trace "Sending wantHave request to peers", cid + +proc sendWantHave( + b: BlockExcEngine, + address: BlockAddress, + selectedPeer: BlockExcPeerCtx, + peers: seq[BlockExcPeerCtx]): Future[void] {.async.} = + trace "Sending wantHave request to peers", address for p in peers: if p != selectedPeer: - if cid notin p.peerHave: + if address notin p.peerHave: trace " wantHave > ", peer = p.id await b.network.request.sendWantList( p.id, - @[cid], + @[address], wantType = WantType.WantHave) # we only want to know if the peer has the block -proc sendWantBlock(b: BlockExcEngine, cid: Cid, blockPeer: BlockExcPeerCtx): Future[void] {.async.} = - trace "Sending wantBlock request to", peer = blockPeer.id, cid +proc sendWantBlock( + b: BlockExcEngine, + address: BlockAddress, + blockPeer: BlockExcPeerCtx): Future[void] {.async.} = + trace "Sending wantBlock request to", peer = blockPeer.id, address await b.network.request.sendWantList( blockPeer.id, - @[cid], + @[address], wantType = WantType.WantBlock) # we want this remote to send us a block proc findCheapestPeerForBlock(b: BlockExcEngine, cheapestPeers: seq[BlockExcPeerCtx]): ?BlockExcPeerCtx = @@ -152,64 +156,63 @@ proc findCheapestPeerForBlock(b: BlockExcEngine, cheapestPeers: seq[BlockExcPeer return some(peers[0]) return some(cheapestPeers[0]) # get cheapest +proc monitorBlockHandle(b: BlockExcEngine, handle: Future[Block], address: BlockAddress, peerId: PeerId) {.async.} = + try: + trace "Monitoring block handle", address, peerId + discard await handle + trace "Block handle success", address, peerId + except CatchableError as exc: + trace "Error block handle, disconnecting peer", address, exc = exc.msg, peerId + + # TODO: really, this is just a quick and dirty way of + # preventing hitting the same "bad" peer every time, however, + # we might as well discover this on or next iteration, so + # it doesn't mean that we're never talking to this peer again. + # TODO: we need a lot more work around peer selection and + # prioritization + + # drop unresponsive peer + b.discovery.queueFindBlocksReq(@[address.cidOrTreeCid]) + await b.network.switch.disconnect(peerId) + +proc requestBlock*( + b: BlockExcEngine, + address: BlockAddress, + timeout = DefaultBlockTimeout +): Future[Block] {.async.} = + let blockFuture = b.pendingBlocks.getWantHandle(address, timeout) + + if b.pendingBlocks.isInFlight(address): + return await blockFuture + + let peers = b.peers.selectCheapest(address) + if peers.len == 0: + b.discovery.queueFindBlocksReq(@[address.cidOrTreeCid]) + + let maybePeer = + if peers.len > 0: + peers[hash(address) mod peers.len].some + elif b.peers.len > 0: + toSeq(b.peers)[hash(address) mod b.peers.len].some + else: + BlockExcPeerCtx.none + + if peer =? maybePeer: + asyncSpawn b.monitorBlockHandle(blockFuture, address, peer.id) + b.pendingBlocks.setInFlight(address) + await b.sendWantBlock(address, peer) + codex_block_exchange_want_block_lists_sent.inc() + await b.sendWantHave(address, peer, toSeq(b.peers)) + codex_block_exchange_want_have_lists_sent.inc() + + return await blockFuture + proc requestBlock*( b: BlockExcEngine, cid: Cid, - timeout = DefaultBlockTimeout): Future[bt.Block] {.async.} = - trace "Begin block request", cid, peers = b.peers.len - - if b.pendingBlocks.isInFlight(cid): - trace "Request handle already pending", cid - return await b.pendingBlocks.getWantHandle(cid, timeout) - - let - blk = b.pendingBlocks.getWantHandle(cid, timeout) - - trace "Selecting peers who have", cid - var - peers = b.peers.selectCheapest(cid) - - without blockPeer =? b.findCheapestPeerForBlock(peers): - trace "No peers to request blocks from. Queue discovery...", cid - b.discovery.queueFindBlocksReq(@[cid]) - return await blk - - proc blockHandleMonitor() {.async.} = - try: - trace "Monitoring block handle", cid - b.pendingBlocks.setInFlight(cid, true) - discard await blk - trace "Block handle success", cid - except CatchableError as exc: - trace "Error block handle, disconnecting peer", cid, exc = exc.msg - - # TODO: really, this is just a quick and dirty way of - # preventing hitting the same "bad" peer every time, however, - # we might as well discover this on or next iteration, so - # it doesn't mean that we're never talking to this peer again. - # TODO: we need a lot more work around peer selection and - # prioritization - - # drop unresponsive peer - await b.network.switch.disconnect(blockPeer.id) - - # monitor block handle - asyncSpawn blockHandleMonitor() - - await b.sendWantBlock(cid, blockPeer) - - codex_block_exchange_want_block_lists_sent.inc() - - if (peers.len - 1) == 0: - trace "No peers to send want list to", cid - b.discovery.queueFindBlocksReq(@[cid]) - return await blk - - await b.sendWantHave(cid, blockPeer, toSeq(b.peers)) - - codex_block_exchange_want_have_lists_sent.inc() - - return await blk + timeout = DefaultBlockTimeout +): Future[Block] = + b.requestBlock(BlockAddress.init(cid)) proc blockPresenceHandler*( b: BlockExcEngine, @@ -226,7 +229,7 @@ proc blockPresenceHandler*( for blk in blocks: if presence =? Presence.init(blk): logScope: - cid = presence.cid + address = $presence.address have = presence.have price = presence.price @@ -255,22 +258,22 @@ proc blockPresenceHandler*( # if none of the connected peers report our wants in their have list, # fire up discovery b.discovery.queueFindBlocksReq( - toSeq(b.pendingBlocks.wantList) + toSeq(b.pendingBlocks.wantListCids) .filter do(cid: Cid) -> bool: - not b.peers.anyIt( cid in it.peerHave )) + not b.peers.anyIt( cid in it.peerHaveCids )) -proc scheduleTasks(b: BlockExcEngine, blocks: seq[bt.Block]) {.async.} = - trace "Schedule a task for new blocks", items = blocks.len +proc scheduleTasks(b: BlockExcEngine, blocksDelivery: seq[BlockDelivery]) {.async.} = + trace "Schedule a task for new blocks", items = blocksDelivery.len let - cids = blocks.mapIt( it.cid ) + cids = blocksDelivery.mapIt( it.blk.cid ) # schedule any new peers to provide blocks to for p in b.peers: for c in cids: # for each cid # schedule a peer if it wants at least one cid # and we have it in our local store - if c in p.peerWants: + if c in p.peerWantsCids: if await (c in b.localStore): if b.scheduleTask(p): trace "Task scheduled for peer", peer = p.id @@ -279,50 +282,110 @@ proc scheduleTasks(b: BlockExcEngine, blocks: seq[bt.Block]) {.async.} = break # do next peer -proc resolveBlocks*(b: BlockExcEngine, blocks: seq[bt.Block]) {.async.} = - trace "Resolving blocks", blocks = blocks.len +proc resolveBlocks*(b: BlockExcEngine, blocksDelivery: seq[BlockDelivery]) {.async.} = + trace "Resolving blocks", blocks = blocksDelivery.len - b.pendingBlocks.resolve(blocks) - await b.scheduleTasks(blocks) - b.discovery.queueProvideBlocksReq(blocks.mapIt( it.cid )) + b.pendingBlocks.resolve(blocksDelivery) + await b.scheduleTasks(blocksDelivery) + var cids = initHashSet[Cid]() + for bd in blocksDelivery: + cids.incl(bd.blk.cid) + if bd.address.leaf: + cids.incl(bd.address.treeCid) + b.discovery.queueProvideBlocksReq(cids.toSeq) + +proc resolveBlocks*(b: BlockExcEngine, blocks: seq[Block]) {.async.} = + await b.resolveBlocks(blocks.mapIt(BlockDelivery(blk: it, address: BlockAddress(leaf: false, cid: it.cid)))) proc payForBlocks(engine: BlockExcEngine, peer: BlockExcPeerCtx, - blocks: seq[bt.Block]) {.async.} = - trace "Paying for blocks", blocks = blocks.len + blocksDelivery: seq[BlockDelivery]) {.async.} = + trace "Paying for blocks", len = blocksDelivery.len let sendPayment = engine.network.request.sendPayment - price = peer.price(blocks.mapIt(it.cid)) + price = peer.price(blocksDelivery.mapIt(it.address)) if payment =? engine.wallet.pay(peer, price): trace "Sending payment for blocks", price await sendPayment(peer.id, payment) -proc blocksHandler*( +proc validateBlockDelivery( + b: BlockExcEngine, + bd: BlockDelivery +): ?!void = + if bd.address notin b.pendingBlocks: + return failure("Received block is not currently a pending block") + + if bd.address.leaf: + without proof =? bd.proof: + return failure("Missing proof") + + if proof.index != bd.address.index: + return failure("Proof index " & $proof.index & " doesn't match leaf index " & $bd.address.index) + + without leaf =? bd.blk.cid.mhash.mapFailure, err: + return failure("Unable to get mhash from cid for block, nested err: " & err.msg) + + without treeRoot =? bd.address.treeCid.mhash.mapFailure, err: + return failure("Unable to get mhash from treeCid for block, nested err: " & err.msg) + + without verifyOutcome =? proof.verifyLeaf(leaf, treeRoot), err: + return failure("Unable to verify proof for block, nested err: " & err.msg) + + if not verifyOutcome: + return failure("Provided inclusion proof is invalid") + else: # not leaf + if bd.address.cid != bd.blk.cid: + return failure("Delivery cid " & $bd.address.cid & " doesn't match block cid " & $bd.blk.cid) + + return success() + +proc blocksDeliveryHandler*( b: BlockExcEngine, peer: PeerId, - blocks: seq[bt.Block]) {.async.} = - trace "Got blocks from peer", peer, len = blocks.len - for blk in blocks: - if isErr (await b.localStore.putBlock(blk)): - trace "Unable to store block", cid = blk.cid + blocksDelivery: seq[BlockDelivery]) {.async.} = + trace "Got blocks from peer", peer, len = blocksDelivery.len - await b.resolveBlocks(blocks) - codex_block_exchange_blocks_received.inc(blocks.len.int64) + var validatedBlocksDelivery: seq[BlockDelivery] + for bd in blocksDelivery: + logScope: + peer = peer + address = bd.address + + if err =? b.validateBlockDelivery(bd).errorOption: + warn "Block validation failed", msg = err.msg + continue + + if err =? (await b.localStore.putBlock(bd.blk)).errorOption: + error "Unable to store block", err = err.msg + continue + + if bd.address.leaf: + without proof =? bd.proof: + error "Proof expected for a leaf block delivery" + continue + if err =? (await b.localStore.putBlockCidAndProof(bd.address.treeCid, bd.address.index, bd.blk.cid, proof)).errorOption: + error "Unable to store proof and cid for a block" + continue + + validatedBlocksDelivery.add(bd) + + await b.resolveBlocks(validatedBlocksDelivery) + codex_block_exchange_blocks_received.inc(validatedBlocksDelivery.len.int64) let peerCtx = b.peers.get(peer) if peerCtx != nil: - await b.payForBlocks(peerCtx, blocks) + await b.payForBlocks(peerCtx, blocksDelivery) ## shouldn't we remove them from the want-list instead of this: - peerCtx.cleanPresence(blocks.mapIt( it.cid )) + peerCtx.cleanPresence(blocksDelivery.mapIt( it.address )) proc wantListHandler*( b: BlockExcEngine, peer: PeerId, - wantList: Wantlist) {.async.} = + wantList: WantList) {.async.} = trace "Got wantList for peer", peer, items = wantList.entries.len let peerCtx = b.peers.get(peer) @@ -338,14 +401,14 @@ proc wantListHandler*( logScope: peer = peerCtx.id - cid = e.cid + address = e.address wantType = $e.wantType if idx < 0: # updating entry - trace "Processing new want list entry", cid = e.cid + trace "Processing new want list entry" let - have = await e.cid in b.localStore + have = await e.address in b.localStore price = @( b.pricing.get(Pricing(price: 0.u256)) .price.toBytesBE) @@ -354,21 +417,21 @@ proc wantListHandler*( codex_block_exchange_want_have_lists_received.inc() if not have and e.sendDontHave: - trace "Adding dont have entry to presence response", cid = e.cid + trace "Adding dont have entry to presence response" presence.add( BlockPresence( - cid: e.cid.data.buffer, + address: e.address, `type`: BlockPresenceType.DontHave, price: price)) elif have and e.wantType == WantType.WantHave: - trace "Adding have entry to presence response", cid = e.cid + trace "Adding have entry to presence response" presence.add( BlockPresence( - cid: e.cid.data.buffer, + address: e.address, `type`: BlockPresenceType.Have, price: price)) elif e.wantType == WantType.WantBlock: - trace "Added entry to peer's want blocks list", cid = e.cid + trace "Added entry to peer's want blocks list" peerCtx.peerWants.add(e) codex_block_exchange_want_block_lists_received.inc() else: @@ -424,6 +487,8 @@ proc setupPeer*(b: BlockExcEngine, peer: PeerId) {.async.} = ## list exchange ## + trace "Setting up peer", peer + if peer notin b.peers: trace "Setting up new peer", peer b.peers.add(BlockExcPeerCtx( @@ -432,9 +497,11 @@ proc setupPeer*(b: BlockExcEngine, peer: PeerId) {.async.} = trace "Added peer", peers = b.peers.len # broadcast our want list, the other peer will do the same - if b.pendingBlocks.len > 0: + if b.pendingBlocks.wantListLen > 0: + trace "Sending our want list to a peer", peer + let cids = toSeq(b.pendingBlocks.wantList) await b.network.request.sendWantList( - peer, toSeq(b.pendingBlocks.wantList), full = true) + peer, cids, full = true) if address =? b.pricing.?address: await b.network.request.sendAccount(peer, Account(address: address)) @@ -468,30 +535,41 @@ proc taskHandler*(b: BlockExcEngine, task: BlockExcPeerCtx) {.gcsafe, async.} = wantsBlocks.sort(SortOrder.Descending) + proc localLookup(e: WantListEntry): Future[?!BlockDelivery] {.async.} = + trace "Handling lookup for entry", address = e.address + if e.address.leaf: + (await b.localStore.getBlockAndProof(e.address.treeCid, e.address.index)).map( + (blkAndProof: (Block, MerkleProof)) => + BlockDelivery(address: e.address, blk: blkAndProof[0], proof: blkAndProof[1].some) + ) + else: + (await b.localStore.getBlock(e.address)).map( + (blk: Block) => BlockDelivery(address: e.address, blk: blk, proof: MerkleProof.none) + ) + let - blockFuts = await allFinished(wantsBlocks.mapIt( - b.localStore.getBlock(it.cid) - )) + blocksDeliveryFut = await allFinished(wantsBlocks.map(localLookup)) # Extract successfully received blocks let - blocks = blockFuts + blocksDelivery = blocksDeliveryFut .filterIt(it.completed and it.read.isOk) .mapIt(it.read.get) - if blocks.len > 0: - trace "Sending blocks to peer", peer = task.id, blocks = blocks.len - await b.network.request.sendBlocks( + if blocksDelivery.len > 0: + trace "Sending blocks to peer", peer = task.id, blocks = blocksDelivery.len + await b.network.request.sendBlocksDelivery( task.id, - blocks) + blocksDelivery + ) - codex_block_exchange_blocks_sent.inc(blocks.len.int64) + codex_block_exchange_blocks_sent.inc(blocksDelivery.len.int64) - trace "About to remove entries from peerWants", blocks = blocks.len, items = task.peerWants.len + trace "About to remove entries from peerWants", blocks = blocksDelivery.len, items = task.peerWants.len # Remove successfully sent blocks task.peerWants.keepIf( - proc(e: Entry): bool = - not blocks.anyIt( it.cid == e.cid ) + proc(e: WantListEntry): bool = + not blocksDelivery.anyIt( it.address == e.address ) ) trace "Removed entries from peerWants", items = task.peerWants.len @@ -547,7 +625,7 @@ proc new*( proc blockWantListHandler( peer: PeerId, - wantList: Wantlist): Future[void] {.gcsafe.} = + wantList: WantList): Future[void] {.gcsafe.} = engine.wantListHandler(peer, wantList) proc blockPresenceHandler( @@ -555,10 +633,10 @@ proc new*( presence: seq[BlockPresence]): Future[void] {.gcsafe.} = engine.blockPresenceHandler(peer, presence) - proc blocksHandler( + proc blocksDeliveryHandler( peer: PeerId, - blocks: seq[bt.Block]): Future[void] {.gcsafe.} = - engine.blocksHandler(peer, blocks) + blocksDelivery: seq[BlockDelivery]): Future[void] {.gcsafe.} = + engine.blocksDeliveryHandler(peer, blocksDelivery) proc accountHandler(peer: PeerId, account: Account): Future[void] {.gcsafe.} = engine.accountHandler(peer, account) @@ -568,7 +646,7 @@ proc new*( network.handlers = BlockExcHandlers( onWantList: blockWantListHandler, - onBlocks: blocksHandler, + onBlocksDelivery: blocksDeliveryHandler, onPresence: blockPresenceHandler, onAccount: accountHandler, onPayment: paymentHandler) diff --git a/codex/blockexchange/engine/pendingblocks.nim b/codex/blockexchange/engine/pendingblocks.nim index c1eaf22e..328f3ead 100644 --- a/codex/blockexchange/engine/pendingblocks.nim +++ b/codex/blockexchange/engine/pendingblocks.nim @@ -18,8 +18,11 @@ import pkg/chronicles import pkg/chronos import pkg/libp2p import pkg/metrics +import pkg/questionable/results +import ../protobuf/blockexc import ../../blocktype +import ../../merkletree logScope: topics = "codex pendingblocks" @@ -37,14 +40,14 @@ type startTime*: int64 PendingBlocksManager* = ref object of RootObj - blocks*: Table[Cid, BlockReq] # pending Block requests + blocks*: Table[BlockAddress, BlockReq] # pending Block requests proc updatePendingBlockGauge(p: PendingBlocksManager) = codex_block_exchange_pending_block_requests.set(p.blocks.len.int64) proc getWantHandle*( p: PendingBlocksManager, - cid: Cid, + address: BlockAddress, timeout = DefaultBlockTimeout, inFlight = false ): Future[Block] {.async.} = @@ -52,73 +55,106 @@ proc getWantHandle*( ## try: - if cid notin p.blocks: - p.blocks[cid] = BlockReq( + if address notin p.blocks: + p.blocks[address] = BlockReq( handle: newFuture[Block]("pendingBlocks.getWantHandle"), inFlight: inFlight, startTime: getMonoTime().ticks) - trace "Adding pending future for block", cid, inFlight = p.blocks[cid].inFlight + trace "Adding pending future for block", address, inFlight = p.blocks[address].inFlight p.updatePendingBlockGauge() - return await p.blocks[cid].handle.wait(timeout) + return await p.blocks[address].handle.wait(timeout) except CancelledError as exc: - trace "Blocks cancelled", exc = exc.msg, cid + trace "Blocks cancelled", exc = exc.msg, address raise exc except CatchableError as exc: trace "Pending WANT failed or expired", exc = exc.msg # no need to cancel, it is already cancelled by wait() raise exc finally: - p.blocks.del(cid) + p.blocks.del(address) p.updatePendingBlockGauge() -proc resolve*(p: PendingBlocksManager, - blocks: seq[Block]) = +proc getWantHandle*( + p: PendingBlocksManager, + cid: Cid, + timeout = DefaultBlockTimeout, + inFlight = false +): Future[Block] = + p.getWantHandle(BlockAddress.init(cid), timeout, inFlight) + +proc resolve*( + p: PendingBlocksManager, + blocksDelivery: seq[BlockDelivery] + ) {.gcsafe, raises: [].} = ## Resolve pending blocks ## - for blk in blocks: - # resolve any pending blocks - p.blocks.withValue(blk.cid, pending): - if not pending[].handle.completed: - trace "Resolving block", cid = blk.cid - pending[].handle.complete(blk) + for bd in blocksDelivery: + p.blocks.withValue(bd.address, blockReq): + trace "Resolving block", address = bd.address + + if not blockReq.handle.finished: let - startTime = pending[].startTime + startTime = blockReq.startTime stopTime = getMonoTime().ticks retrievalDurationUs = (stopTime - startTime) div 1000 + + blockReq.handle.complete(bd.blk) + codex_block_exchange_retrieval_time_us.set(retrievalDurationUs) - trace "Block retrieval time", retrievalDurationUs + trace "Block retrieval time", retrievalDurationUs, address = bd.address + else: + trace "Block handle already finished", address = bd.address + do: + warn "Attempting to resolve block that's not currently a pending block", address = bd.address proc setInFlight*(p: PendingBlocksManager, - cid: Cid, + address: BlockAddress, inFlight = true) = - p.blocks.withValue(cid, pending): + p.blocks.withValue(address, pending): pending[].inFlight = inFlight - trace "Setting inflight", cid, inFlight = pending[].inFlight + trace "Setting inflight", address, inFlight = pending[].inFlight proc isInFlight*(p: PendingBlocksManager, - cid: Cid + address: BlockAddress, ): bool = - p.blocks.withValue(cid, pending): + p.blocks.withValue(address, pending): result = pending[].inFlight - trace "Getting inflight", cid, inFlight = result - -proc pending*(p: PendingBlocksManager, cid: Cid): bool = - cid in p.blocks + trace "Getting inflight", address, inFlight = result proc contains*(p: PendingBlocksManager, cid: Cid): bool = - p.pending(cid) + BlockAddress.init(cid) in p.blocks + +proc contains*(p: PendingBlocksManager, address: BlockAddress): bool = + address in p.blocks + +iterator wantList*(p: PendingBlocksManager): BlockAddress = + for a in p.blocks.keys: + yield a + +iterator wantListBlockCids*(p: PendingBlocksManager): Cid = + for a in p.blocks.keys: + if not a.leaf: + yield a.cid + +iterator wantListCids*(p: PendingBlocksManager): Cid = + var yieldedCids = initHashSet[Cid]() + for a in p.blocks.keys: + let cid = a.cidOrTreeCid + if cid notin yieldedCids: + yieldedCids.incl(cid) + yield cid -iterator wantList*(p: PendingBlocksManager): Cid = - for k in p.blocks.keys: - yield k iterator wantHandles*(p: PendingBlocksManager): Future[Block] = for v in p.blocks.values: yield v.handle +proc wantListLen*(p: PendingBlocksManager): int = + p.blocks.len + func len*(p: PendingBlocksManager): int = p.blocks.len diff --git a/codex/blockexchange/network/network.nim b/codex/blockexchange/network/network.nim index 88209f9f..0c7d2e2c 100644 --- a/codex/blockexchange/network/network.nim +++ b/codex/blockexchange/network/network.nim @@ -34,14 +34,14 @@ const MaxInflight* = 100 type - WantListHandler* = proc(peer: PeerId, wantList: Wantlist): Future[void] {.gcsafe.} - BlocksHandler* = proc(peer: PeerId, blocks: seq[bt.Block]): Future[void] {.gcsafe.} + WantListHandler* = proc(peer: PeerId, wantList: WantList): Future[void] {.gcsafe.} + BlocksDeliveryHandler* = proc(peer: PeerId, blocks: seq[BlockDelivery]): Future[void] {.gcsafe.} BlockPresenceHandler* = proc(peer: PeerId, precense: seq[BlockPresence]): Future[void] {.gcsafe.} AccountHandler* = proc(peer: PeerId, account: Account): Future[void] {.gcsafe.} PaymentHandler* = proc(peer: PeerId, payment: SignedState): Future[void] {.gcsafe.} WantListSender* = proc( id: PeerId, - cids: seq[Cid], + addresses: seq[BlockAddress], priority: int32 = 0, cancel: bool = false, wantType: WantType = WantType.WantHave, @@ -50,19 +50,19 @@ type BlockExcHandlers* = object onWantList*: WantListHandler - onBlocks*: BlocksHandler + onBlocksDelivery*: BlocksDeliveryHandler onPresence*: BlockPresenceHandler onAccount*: AccountHandler onPayment*: PaymentHandler - BlocksSender* = proc(peer: PeerId, presence: seq[bt.Block]): Future[void] {.gcsafe.} + BlocksDeliverySender* = proc(peer: PeerId, blocksDelivery: seq[BlockDelivery]): Future[void] {.gcsafe.} PresenceSender* = proc(peer: PeerId, presence: seq[BlockPresence]): Future[void] {.gcsafe.} AccountSender* = proc(peer: PeerId, account: Account): Future[void] {.gcsafe.} PaymentSender* = proc(peer: PeerId, payment: SignedState): Future[void] {.gcsafe.} BlockExcRequest* = object sendWantList*: WantListSender - sendBlocks*: BlocksSender + sendBlocksDelivery*: BlocksDeliverySender sendPresence*: PresenceSender sendAccount*: AccountSender sendPayment*: PaymentSender @@ -94,7 +94,7 @@ proc send*(b: BlockExcNetwork, id: PeerId, msg: pb.Message) {.async.} = proc handleWantList( b: BlockExcNetwork, peer: NetworkPeer, - list: Wantlist) {.async.} = + list: WantList) {.async.} = ## Handle incoming want list ## @@ -102,32 +102,10 @@ proc handleWantList( trace "Handling want list for peer", peer = peer.id, items = list.entries.len await b.handlers.onWantList(peer.id, list) -# TODO: make into a template -proc makeWantList*( - cids: seq[Cid], - priority: int = 0, - cancel: bool = false, - wantType: WantType = WantType.WantHave, - full: bool = false, - sendDontHave: bool = false -): Wantlist = - ## make list of wanted entries - ## - - Wantlist( - entries: cids.mapIt( - Entry( - `block`: it.data.buffer, - priority: priority.int32, - cancel: cancel, - wantType: wantType, - sendDontHave: sendDontHave) ), - full: full) - proc sendWantList*( b: BlockExcNetwork, id: PeerId, - cids: seq[Cid], + addresses: seq[BlockAddress], priority: int32 = 0, cancel: bool = false, wantType: WantType = WantType.WantHave, @@ -137,58 +115,40 @@ proc sendWantList*( ## Send a want message to peer ## - trace "Sending want list to peer", peer = id, `type` = $wantType, items = cids.len - let msg = makeWantList( - cids, - priority, - cancel, - wantType, - full, - sendDontHave) - + trace "Sending want list to peer", peer = id, `type` = $wantType, items = addresses.len + let msg = WantList( + entries: addresses.mapIt( + WantListEntry( + address: it, + priority: priority, + cancel: cancel, + wantType: wantType, + sendDontHave: sendDontHave) ), + full: full) + b.send(id, Message(wantlist: msg)) -proc handleBlocks( +proc handleBlocksDelivery( b: BlockExcNetwork, peer: NetworkPeer, - blocks: seq[pb.Block] + blocksDelivery: seq[BlockDelivery] ) {.async.} = ## Handle incoming blocks ## - if not b.handlers.onBlocks.isNil: - trace "Handling blocks for peer", peer = peer.id, items = blocks.len + if not b.handlers.onBlocksDelivery.isNil: + trace "Handling blocks for peer", peer = peer.id, items = blocksDelivery.len + await b.handlers.onBlocksDelivery(peer.id, blocksDelivery) - var blks: seq[bt.Block] - for blob in blocks: - without cid =? Cid.init(blob.prefix): - trace "Unable to initialize Cid from protobuf message" - without blk =? bt.Block.new(cid, blob.data, verify = true): - trace "Unable to initialize Block from data" - - blks.add(blk) - - await b.handlers.onBlocks(peer.id, blks) - -template makeBlocks*(blocks: seq[bt.Block]): seq[pb.Block] = - var blks: seq[pb.Block] - for blk in blocks: - blks.add(pb.Block( - prefix: blk.cid.data.buffer, - data: blk.data - )) - - blks - -proc sendBlocks*( +proc sendBlocksDelivery*( b: BlockExcNetwork, id: PeerId, - blocks: seq[bt.Block]): Future[void] = + blocksDelivery: seq[BlockDelivery]): Future[void] = ## Send blocks to remote ## - b.send(id, pb.Message(payload: makeBlocks(blocks))) + b.send(id, pb.Message(payload: blocksDelivery)) proc handleBlockPresence( b: BlockExcNetwork, @@ -260,11 +220,11 @@ proc rpcHandler( ## handle rpc messages ## try: - if msg.wantlist.entries.len > 0: - asyncSpawn b.handleWantList(peer, msg.wantlist) + if msg.wantList.entries.len > 0: + asyncSpawn b.handleWantList(peer, msg.wantList) if msg.payload.len > 0: - asyncSpawn b.handleBlocks(peer, msg.payload) + asyncSpawn b.handleBlocksDelivery(peer, msg.payload) if msg.blockPresences.len > 0: asyncSpawn b.handleBlockPresence(peer, msg.blockPresences) @@ -359,7 +319,7 @@ proc new*( proc sendWantList( id: PeerId, - cids: seq[Cid], + cids: seq[BlockAddress], priority: int32 = 0, cancel: bool = false, wantType: WantType = WantType.WantHave, @@ -369,8 +329,8 @@ proc new*( id, cids, priority, cancel, wantType, full, sendDontHave) - proc sendBlocks(id: PeerId, blocks: seq[bt.Block]): Future[void] {.gcsafe.} = - self.sendBlocks(id, blocks) + proc sendBlocksDelivery(id: PeerId, blocksDelivery: seq[BlockDelivery]): Future[void] {.gcsafe.} = + self.sendBlocksDelivery(id, blocksDelivery) proc sendPresence(id: PeerId, presence: seq[BlockPresence]): Future[void] {.gcsafe.} = self.sendBlockPresence(id, presence) @@ -383,7 +343,7 @@ proc new*( self.request = BlockExcRequest( sendWantList: sendWantList, - sendBlocks: sendBlocks, + sendBlocksDelivery: sendBlocksDelivery, sendPresence: sendPresence, sendAccount: sendAccount, sendPayment: sendPayment) diff --git a/codex/blockexchange/peers/peercontext.nim b/codex/blockexchange/peers/peercontext.nim index 6c9eac1c..66418ddd 100644 --- a/codex/blockexchange/peers/peercontext.nim +++ b/codex/blockexchange/peers/peercontext.nim @@ -9,6 +9,8 @@ import std/sequtils import std/tables +import std/sugar +import std/sets import pkg/chronicles import pkg/libp2p @@ -20,6 +22,8 @@ import ../protobuf/blockexc import ../protobuf/payments import ../protobuf/presence +import ../../blocktype + export payments, nitro logScope: @@ -28,33 +32,39 @@ logScope: type BlockExcPeerCtx* = ref object of RootObj id*: PeerId - blocks*: Table[Cid, Presence] # remote peer have list including price - peerWants*: seq[Entry] # remote peers want lists + blocks*: Table[BlockAddress, Presence] # remote peer have list including price + peerWants*: seq[WantListEntry] # remote peers want lists exchanged*: int # times peer has exchanged with us lastExchange*: Moment # last time peer has exchanged with us account*: ?Account # ethereum account of this peer paymentChannel*: ?ChannelId # payment channel id -proc peerHave*(self: BlockExcPeerCtx): seq[Cid] = +proc peerHave*(self: BlockExcPeerCtx): seq[BlockAddress] = toSeq(self.blocks.keys) -proc contains*(self: BlockExcPeerCtx, cid: Cid): bool = - cid in self.blocks +proc peerHaveCids*(self: BlockExcPeerCtx): HashSet[Cid] = + self.blocks.keys.toSeq.mapIt(it.cidOrTreeCid).toHashSet + +proc peerWantsCids*(self: BlockExcPeerCtx): HashSet[Cid] = + self.peerWants.mapIt(it.address.cidOrTreeCid).toHashSet + +proc contains*(self: BlockExcPeerCtx, address: BlockAddress): bool = + address in self.blocks func setPresence*(self: BlockExcPeerCtx, presence: Presence) = - self.blocks[presence.cid] = presence + self.blocks[presence.address] = presence -func cleanPresence*(self: BlockExcPeerCtx, cids: seq[Cid]) = - for cid in cids: - self.blocks.del(cid) +func cleanPresence*(self: BlockExcPeerCtx, addresses: seq[BlockAddress]) = + for a in addresses: + self.blocks.del(a) -func cleanPresence*(self: BlockExcPeerCtx, cid: Cid) = - self.cleanPresence(@[cid]) +func cleanPresence*(self: BlockExcPeerCtx, address: BlockAddress) = + self.cleanPresence(@[address]) -func price*(self: BlockExcPeerCtx, cids: seq[Cid]): UInt256 = +func price*(self: BlockExcPeerCtx, addresses: seq[BlockAddress]): UInt256 = var price = 0.u256 - for cid in cids: - self.blocks.withValue(cid, precense): + for a in addresses: + self.blocks.withValue(a, precense): price += precense[].price trace "Blocks price", price diff --git a/codex/blockexchange/peers/peerctxstore.nim b/codex/blockexchange/peers/peerctxstore.nim index 340d3ee0..f23415f6 100644 --- a/codex/blockexchange/peers/peerctxstore.nim +++ b/codex/blockexchange/peers/peerctxstore.nim @@ -20,6 +20,7 @@ import pkg/chronicles import pkg/libp2p import ../protobuf/blockexc +import ../../blocktype import ./peercontext export peercontext @@ -59,24 +60,32 @@ func get*(self: PeerCtxStore, peerId: PeerId): BlockExcPeerCtx = func len*(self: PeerCtxStore): int = self.peers.len +func peersHave*(self: PeerCtxStore, address: BlockAddress): seq[BlockExcPeerCtx] = + toSeq(self.peers.values).filterIt( it.peerHave.anyIt( it == address ) ) + func peersHave*(self: PeerCtxStore, cid: Cid): seq[BlockExcPeerCtx] = - toSeq(self.peers.values).filterIt( it.peerHave.anyIt( it == cid ) ) + toSeq(self.peers.values).filterIt( it.peerHave.anyIt( it.cidOrTreeCid == cid ) ) + +func peersWant*(self: PeerCtxStore, address: BlockAddress): seq[BlockExcPeerCtx] = + toSeq(self.peers.values).filterIt( it.peerWants.anyIt( it == address ) ) func peersWant*(self: PeerCtxStore, cid: Cid): seq[BlockExcPeerCtx] = - toSeq(self.peers.values).filterIt( it.peerWants.anyIt( it.cid == cid ) ) + toSeq(self.peers.values).filterIt( it.peerWants.anyIt( it.address.cidOrTreeCid == cid ) ) -func selectCheapest*(self: PeerCtxStore, cid: Cid): seq[BlockExcPeerCtx] = - var peers = self.peersHave(cid) +func selectCheapest*(self: PeerCtxStore, address: BlockAddress): seq[BlockExcPeerCtx] = + # assume that the price for all leaves in a tree is the same + let rootAddress = BlockAddress(leaf: false, cid: address.cidOrTreeCid) + var peers = self.peersHave(rootAddress) func cmp(a, b: BlockExcPeerCtx): int = var priceA = 0.u256 priceB = 0.u256 - a.blocks.withValue(cid, precense): + a.blocks.withValue(rootAddress, precense): priceA = precense[].price - b.blocks.withValue(cid, precense): + b.blocks.withValue(rootAddress, precense): priceB = precense[].price if priceA == priceB: diff --git a/codex/blockexchange/protobuf/blockexc.nim b/codex/blockexchange/protobuf/blockexc.nim index 43a9dff1..d511ea82 100644 --- a/codex/blockexchange/protobuf/blockexc.nim +++ b/codex/blockexchange/protobuf/blockexc.nim @@ -10,46 +10,45 @@ import std/hashes import std/sequtils import pkg/libp2p +import pkg/stew/endians2 import message +import ../../blocktype + export Message, protobufEncode, protobufDecode -export Wantlist, WantType, Entry -export Block, BlockPresenceType, BlockPresence +export Wantlist, WantType, WantListEntry +export BlockDelivery, BlockPresenceType, BlockPresence export AccountMessage, StateChannelUpdate -proc hash*(e: Entry): Hash = - hash(e.`block`) +proc hash*(a: BlockAddress): Hash = + if a.leaf: + let data = a.treeCid.data.buffer & @(a.index.uint64.toBytesBE) + hash(data) + else: + hash(a.cid.data.buffer) -proc cid*(e: Entry): Cid = - ## Helper to convert raw bytes to Cid - ## +proc hash*(e: WantListEntry): Hash = + hash(e.address) - Cid.init(e.`block`).get() - -proc contains*(a: openArray[Entry], b: Cid): bool = +proc contains*(a: openArray[WantListEntry], b: BlockAddress): bool = ## Convenience method to check for peer precense ## - a.filterIt( it.cid == b ).len > 0 + a.anyIt(it.address == b) -proc `==`*(a: Entry, cid: Cid): bool = - return a.cid == cid +proc `==`*(a: WantListEntry, b: BlockAddress): bool = + return a.address == b -proc `<`*(a, b: Entry): bool = +proc `<`*(a, b: WantListEntry): bool = a.priority < b.priority -proc cid*(e: BlockPresence): Cid = - ## Helper to convert raw bytes to Cid - ## - Cid.init(e.cid).get() +proc `==`*(a: BlockPresence, b: BlockAddress): bool = + return a.address == b -proc `==`*(a: BlockPresence, cid: Cid): bool = - return cid(a) == cid - -proc contains*(a: openArray[BlockPresence], b: Cid): bool = +proc contains*(a: openArray[BlockPresence], b: BlockAddress): bool = ## Convenience method to check for peer precense ## - a.filterIt( cid(it) == b ).len > 0 + a.anyIt(it.address == b) diff --git a/codex/blockexchange/protobuf/message.nim b/codex/blockexchange/protobuf/message.nim index bbbfdf49..ffec0fcf 100644 --- a/codex/blockexchange/protobuf/message.nim +++ b/codex/blockexchange/protobuf/message.nim @@ -2,11 +2,18 @@ # and Protobuf encoder/decoder for these messages. # # Eventually all this code should be auto-generated from message.proto. +import std/sugar import pkg/libp2p/protobuf/minprotobuf +import pkg/libp2p/cid + +import pkg/questionable import ../../units +import ../../merkletree +import ../../blocktype + const MaxBlockSize* = 100.MiBs.uint MaxMessageSize* = 100.MiBs.uint @@ -16,27 +23,28 @@ type WantBlock = 0, WantHave = 1 - Entry* = object - `block`*: seq[byte] # The block cid + WantListEntry* = object + address*: BlockAddress priority*: int32 # The priority (normalized). default to 1 cancel*: bool # Whether this revokes an entry wantType*: WantType # Note: defaults to enum 0, ie Block sendDontHave*: bool # Note: defaults to false - Wantlist* = object - entries*: seq[Entry] # A list of wantlist entries - full*: bool # Whether this is the full wantlist. default to false + WantList* = object + entries*: seq[WantListEntry] # A list of wantList entries + full*: bool # Whether this is the full wantList. default to false - Block* = object - prefix*: seq[byte] # CID prefix (cid version, multicodec and multihash prefix (type + length) - data*: seq[byte] + BlockDelivery* = object + blk*: Block + address*: BlockAddress + proof*: ?MerkleProof # Present only if `address.leaf` is true BlockPresenceType* = enum Have = 0, DontHave = 1 BlockPresence* = object - cid*: seq[byte] # The block cid + address*: BlockAddress `type`*: BlockPresenceType price*: seq[byte] # Amount of assets to pay for the block (UInt256) @@ -47,8 +55,8 @@ type update*: seq[byte] # Signed Nitro state, serialized as JSON Message* = object - wantlist*: Wantlist - payload*: seq[Block] + wantList*: WantList + payload*: seq[BlockDelivery] blockPresences*: seq[BlockPresence] pendingBytes*: uint account*: AccountMessage @@ -58,9 +66,20 @@ type # Encoding Message into seq[byte] in Protobuf format # -proc write*(pb: var ProtoBuffer, field: int, value: Entry) = +proc write*(pb: var ProtoBuffer, field: int, value: BlockAddress) = var ipb = initProtoBuffer() - ipb.write(1, value.`block`) + ipb.write(1, value.leaf.uint) + if value.leaf: + ipb.write(2, value.treeCid.data.buffer) + ipb.write(3, value.index.uint64) + else: + ipb.write(4, value.cid.data.buffer) + ipb.finish() + pb.write(field, ipb) + +proc write*(pb: var ProtoBuffer, field: int, value: WantListEntry) = + var ipb = initProtoBuffer() + ipb.write(1, value.address) ipb.write(2, value.priority.uint64) ipb.write(3, value.cancel.uint) ipb.write(4, value.wantType.uint) @@ -68,7 +87,7 @@ proc write*(pb: var ProtoBuffer, field: int, value: Entry) = ipb.finish() pb.write(field, ipb) -proc write*(pb: var ProtoBuffer, field: int, value: Wantlist) = +proc write*(pb: var ProtoBuffer, field: int, value: WantList) = var ipb = initProtoBuffer() for v in value.entries: ipb.write(1, v) @@ -76,16 +95,20 @@ proc write*(pb: var ProtoBuffer, field: int, value: Wantlist) = ipb.finish() pb.write(field, ipb) -proc write*(pb: var ProtoBuffer, field: int, value: Block) = +proc write*(pb: var ProtoBuffer, field: int, value: BlockDelivery) = var ipb = initProtoBuffer(maxSize = MaxBlockSize) - ipb.write(1, value.prefix) - ipb.write(2, value.data) + ipb.write(1, value.blk.cid.data.buffer) + ipb.write(2, value.blk.data) + ipb.write(3, value.address) + if value.address.leaf: + if proof =? value.proof: + ipb.write(4, proof.encode()) ipb.finish() pb.write(field, ipb) proc write*(pb: var ProtoBuffer, field: int, value: BlockPresence) = var ipb = initProtoBuffer() - ipb.write(1, value.cid) + ipb.write(1, value.address) ipb.write(2, value.`type`.uint) ipb.write(3, value.price) ipb.finish() @@ -105,7 +128,7 @@ proc write*(pb: var ProtoBuffer, field: int, value: StateChannelUpdate) = proc protobufEncode*(value: Message): seq[byte] = var ipb = initProtoBuffer(maxSize = MaxMessageSize) - ipb.write(1, value.wantlist) + ipb.write(1, value.wantList) for v in value.payload: ipb.write(3, v) for v in value.blockPresences: @@ -120,12 +143,41 @@ proc protobufEncode*(value: Message): seq[byte] = # # Decoding Message from seq[byte] in Protobuf format # - -proc decode*(_: type Entry, pb: ProtoBuffer): ProtoResult[Entry] = +proc decode*(_: type BlockAddress, pb: ProtoBuffer): ProtoResult[BlockAddress] = var - value = Entry() + value: BlockAddress + leaf: bool field: uint64 - discard ? pb.getField(1, value.`block`) + cidBuf = newSeq[byte]() + + if ? pb.getField(1, field): + leaf = bool(field) + + if leaf: + var + treeCid: Cid + index: Natural + if ? pb.getField(2, cidBuf): + treeCid = ? Cid.init(cidBuf).mapErr(x => ProtoError.IncorrectBlob) + if ? pb.getField(3, field): + index = field + value = BlockAddress(leaf: true, treeCid: treeCid, index: index) + else: + var cid: Cid + if ? pb.getField(4, cidBuf): + cid = ? Cid.init(cidBuf).mapErr(x => ProtoError.IncorrectBlob) + value = BlockAddress(leaf: false, cid: cid) + + ok(value) + +proc decode*(_: type WantListEntry, pb: ProtoBuffer): ProtoResult[WantListEntry] = + var + value = WantListEntry() + field: uint64 + ipb: ProtoBuffer + buf = newSeq[byte]() + if ? pb.getField(1, ipb): + value.address = ? BlockAddress.decode(ipb) if ? pb.getField(2, field): value.priority = int32(field) if ? pb.getField(3, field): @@ -136,30 +188,53 @@ proc decode*(_: type Entry, pb: ProtoBuffer): ProtoResult[Entry] = value.sendDontHave = bool(field) ok(value) -proc decode*(_: type Wantlist, pb: ProtoBuffer): ProtoResult[Wantlist] = +proc decode*(_: type WantList, pb: ProtoBuffer): ProtoResult[WantList] = var - value = Wantlist() + value = WantList() field: uint64 sublist: seq[seq[byte]] if ? pb.getRepeatedField(1, sublist): for item in sublist: - value.entries.add(? Entry.decode(initProtoBuffer(item))) + value.entries.add(? WantListEntry.decode(initProtoBuffer(item))) if ? pb.getField(2, field): value.full = bool(field) ok(value) -proc decode*(_: type Block, pb: ProtoBuffer): ProtoResult[Block] = +proc decode*(_: type BlockDelivery, pb: ProtoBuffer): ProtoResult[BlockDelivery] = var - value = Block() - discard ? pb.getField(1, value.prefix) - discard ? pb.getField(2, value.data) + value = BlockDelivery() + field: uint64 + dataBuf = newSeq[byte]() + cidBuf = newSeq[byte]() + cid: Cid + ipb: ProtoBuffer + + if ? pb.getField(1, cidBuf): + cid = ? Cid.init(cidBuf).mapErr(x => ProtoError.IncorrectBlob) + if ? pb.getField(2, dataBuf): + value.blk = ? Block.new(cid, dataBuf, verify = true).mapErr(x => ProtoError.IncorrectBlob) + if ? pb.getField(3, ipb): + value.address = ? BlockAddress.decode(ipb) + + if value.address.leaf: + var proofBuf = newSeq[byte]() + if ? pb.getField(4, proofBuf): + let proof = ? MerkleProof.decode(proofBuf).mapErr(x => ProtoError.IncorrectBlob) + value.proof = proof.some + else: + value.proof = MerkleProof.none + else: + value.proof = MerkleProof.none + ok(value) proc decode*(_: type BlockPresence, pb: ProtoBuffer): ProtoResult[BlockPresence] = var value = BlockPresence() field: uint64 - discard ? pb.getField(1, value.cid) + ipb: ProtoBuffer + if ? pb.getField(1, ipb): + value.address = ? BlockAddress.decode(ipb) if ? pb.getField(2, field): value.`type` = BlockPresenceType(field) discard ? pb.getField(3, value.price) @@ -184,10 +259,10 @@ proc protobufDecode*(_: type Message, msg: seq[byte]): ProtoResult[Message] = ipb: ProtoBuffer sublist: seq[seq[byte]] if ? pb.getField(1, ipb): - value.wantlist = ? Wantlist.decode(ipb) + value.wantList = ? WantList.decode(ipb) if ? pb.getRepeatedField(3, sublist): for item in sublist: - value.payload.add(? Block.decode(initProtoBuffer(item, maxSize = MaxBlockSize))) + value.payload.add(? BlockDelivery.decode(initProtoBuffer(item, maxSize = MaxBlockSize))) if ? pb.getRepeatedField(4, sublist): for item in sublist: value.blockPresences.add(? BlockPresence.decode(initProtoBuffer(item))) diff --git a/codex/blockexchange/protobuf/presence.nim b/codex/blockexchange/protobuf/presence.nim index 1a1c6c5c..2b07191d 100644 --- a/codex/blockexchange/protobuf/presence.nim +++ b/codex/blockexchange/protobuf/presence.nim @@ -5,6 +5,8 @@ import pkg/questionable/results import pkg/upraises import ./blockexc +import ../../blocktype + export questionable export stint export BlockPresenceType @@ -14,7 +16,7 @@ upraises.push: {.upraises: [].} type PresenceMessage* = blockexc.BlockPresence Presence* = object - cid*: Cid + address*: BlockAddress have*: bool price*: UInt256 @@ -24,19 +26,18 @@ func parse(_: type UInt256, bytes: seq[byte]): ?UInt256 = UInt256.fromBytesBE(bytes).some func init*(_: type Presence, message: PresenceMessage): ?Presence = - without cid =? Cid.init(message.cid) and - price =? UInt256.parse(message.price): + without price =? UInt256.parse(message.price): return none Presence some Presence( - cid: cid, + address: message.address, have: message.`type` == BlockPresenceType.Have, price: price ) func init*(_: type PresenceMessage, presence: Presence): PresenceMessage = PresenceMessage( - cid: presence.cid.data.buffer, + address: presence.address, `type`: if presence.have: BlockPresenceType.Have else: diff --git a/codex/blocktype.nim b/codex/blocktype.nim index 2484908f..080e308a 100644 --- a/codex/blocktype.nim +++ b/codex/blocktype.nim @@ -8,17 +8,19 @@ ## those terms. import std/tables +import std/sugar export tables import pkg/upraises push: {.upraises: [].} -import pkg/libp2p/[cid, multicodec] +import pkg/libp2p/[cid, multicodec, multihash] import pkg/stew/byteutils import pkg/questionable import pkg/questionable/results import pkg/chronicles +import pkg/json_serialization import ./units import ./utils @@ -37,91 +39,50 @@ type cid*: Cid data*: seq[byte] -template EmptyCid*: untyped = - var - EmptyCid {.global, threadvar.}: - array[CIDv0..CIDv1, Table[MultiCodec, Cid]] + BlockAddress* = object + case leaf*: bool + of true: + treeCid*: Cid + index*: Natural + else: + cid*: Cid - once: - EmptyCid = [ - CIDv0: { - multiCodec("sha2-256"): Cid - .init("QmdfTbBqBPQ7VNxZEYEj14VmRuZBkqFbiwReogJgS1zR1n") - .get() - }.toTable, - CIDv1: { - multiCodec("sha2-256"): Cid - .init("bafybeihdwdcefgh4dqkjv67uzcmw7ojee6xedzdetojuzjevtenxquvyku") - .get() - }.toTable, - ] - EmptyCid +proc `==`*(a, b: BlockAddress): bool = + a.leaf == b.leaf and + ( + if a.leaf: + a.treeCid == b.treeCid and a.index == b.index + else: + a.cid == b.cid + ) -template EmptyDigests*: untyped = - var - EmptyDigests {.global, threadvar.}: - array[CIDv0..CIDv1, Table[MultiCodec, MultiHash]] +proc `$`*(a: BlockAddress): string = + if a.leaf: + "treeCid: " & $a.treeCid & ", index: " & $a.index + else: + "cid: " & $a.cid - once: - EmptyDigests = [ - CIDv0: { - multiCodec("sha2-256"): EmptyCid[CIDv0] - .catch - .get()[multiCodec("sha2-256")] - .catch - .get() - .mhash - .get() - }.toTable, - CIDv1: { - multiCodec("sha2-256"): EmptyCid[CIDv1] - .catch - .get()[multiCodec("sha2-256")] - .catch - .get() - .mhash - .get() - }.toTable, - ] +proc writeValue*( + writer: var JsonWriter, + value: Cid +) {.upraises:[IOError].} = + writer.writeValue($value) - EmptyDigests +proc cidOrTreeCid*(a: BlockAddress): Cid = + if a.leaf: + a.treeCid + else: + a.cid -template EmptyBlock*: untyped = - var - EmptyBlock {.global, threadvar.}: - array[CIDv0..CIDv1, Table[MultiCodec, Block]] +proc address*(b: Block): BlockAddress = + BlockAddress(leaf: false, cid: b.cid) - once: - EmptyBlock = [ - CIDv0: { - multiCodec("sha2-256"): Block( - cid: EmptyCid[CIDv0][multiCodec("sha2-256")]) - }.toTable, - CIDv1: { - multiCodec("sha2-256"): Block( - cid: EmptyCid[CIDv1][multiCodec("sha2-256")]) - }.toTable, - ] +proc init*(_: type BlockAddress, cid: Cid): BlockAddress = + BlockAddress(leaf: false, cid: cid) - EmptyBlock - -proc isEmpty*(cid: Cid): bool = - cid == EmptyCid[cid.cidver] - .catch - .get()[cid.mhash.get().mcodec] - .catch - .get() - -proc isEmpty*(blk: Block): bool = - blk.cid.isEmpty - -proc emptyBlock*(cid: Cid): Block = - EmptyBlock[cid.cidver] - .catch - .get()[cid.mhash.get().mcodec] - .catch - .get() +proc init*(_: type BlockAddress, treeCid: Cid, index: Natural): BlockAddress = + BlockAddress(leaf: true, treeCid: treeCid, index: index) proc `$`*(b: Block): string = result &= "cid: " & $b.cid @@ -154,17 +115,58 @@ func new*( verify: bool = true ): ?!Block = ## creates a new block for both storage and network IO + ## + + if verify: + let + mhash = ? cid.mhash.mapFailure + computedMhash = ? MultiHash.digest($mhash.mcodec, data).mapFailure + computedCid = ? Cid.init(cid.cidver, cid.mcodec, computedMhash).mapFailure + if computedCid != cid: + return "Cid doesn't match the data".failure + + return Block( + cid: cid, + data: @data + ).success + +proc emptyCid*(version: CidVersion, hcodec: MultiCodec, dcodec: MultiCodec): ?!Cid = + ## Returns cid representing empty content, given cid version, hash codec and data codec ## - let - mhash = ? cid.mhash.mapFailure - b = ? Block.new( - data = @data, - version = cid.cidver, - codec = cid.mcodec, - mcodec = mhash.mcodec) + const + Sha256 = multiCodec("sha2-256") + Raw = multiCodec("raw") + DagPB = multiCodec("dag-pb") + DagJson = multiCodec("dag-json") - if verify and cid != b.cid: - return "Cid and content don't match!".failure + var index {.global, threadvar.}: Table[(CidVersion, MultiCodec, MultiCodec), Cid] + once: + index = { + # source https://ipld.io/specs/codecs/dag-pb/fixtures/cross-codec/#dagpb_empty + (CIDv0, Sha256, DagPB): ? Cid.init("QmdfTbBqBPQ7VNxZEYEj14VmRuZBkqFbiwReogJgS1zR1n").mapFailure, + (CIDv1, Sha256, DagPB): ? Cid.init("zdj7Wkkhxcu2rsiN6GUyHCLsSLL47kdUNfjbFqBUUhMFTZKBi").mapFailure, # base36: bafybeihdwdcefgh4dqkjv67uzcmw7ojee6xedzdetojuzjevtenxquvyku + (CIDv1, Sha256, DagJson): ? Cid.init("z4EBG9jGUWMVxX9deANWX7iPyExLswe2akyF7xkNAaYgugvnhmP").mapFailure, # base36: baguqeera6mfu3g6n722vx7dbitpnbiyqnwah4ddy4b5c3rwzxc5pntqcupta + (CIDv1, Sha256, Raw): ? Cid.init("zb2rhmy65F3REf8SZp7De11gxtECBGgUKaLdiDj7MCGCHxbDW").mapFailure, + }.toTable - success b + index[(version, hcodec, dcodec)].catch + +proc emptyDigest*(version: CidVersion, hcodec: MultiCodec, dcodec: MultiCodec): ?!MultiHash = + emptyCid(version, hcodec, dcodec) + .flatMap((cid: Cid) => cid.mhash.mapFailure) + +proc emptyBlock*(version: CidVersion, hcodec: MultiCodec): ?!Block = + emptyCid(version, hcodec, multiCodec("raw")) + .flatMap((cid: Cid) => Block.new(cid = cid, data = @[])) + +proc emptyBlock*(cid: Cid): ?!Block = + cid.mhash.mapFailure.flatMap((mhash: MultiHash) => + emptyBlock(cid.cidver, mhash.mcodec)) + +proc isEmpty*(cid: Cid): bool = + success(cid) == cid.mhash.mapFailure.flatMap((mhash: MultiHash) => + emptyCid(cid.cidver, mhash.mcodec, cid.mcodec)) + +proc isEmpty*(blk: Block): bool = + blk.cid.isEmpty diff --git a/codex/erasure/erasure.nim b/codex/erasure/erasure.nim index 67358fb0..35109a99 100644 --- a/codex/erasure/erasure.nim +++ b/codex/erasure/erasure.nim @@ -12,15 +12,21 @@ import pkg/upraises push: {.upraises: [].} import std/sequtils -import std/options +import std/sugar import pkg/chronos import pkg/chronicles -import pkg/questionable +import pkg/libp2p/[multicodec, cid, multibase, multihash] +import pkg/libp2p/protobuf/minprotobuf import ../manifest +import ../merkletree import ../stores import ../blocktype as bt +import ../utils +import ../utils/asynciter + +import pkg/stew/byteutils import ./backend @@ -64,12 +70,14 @@ type decoderProvider*: DecoderProvider store*: BlockStore - GetNext = proc(): Future[?(bt.Block, int)] {.upraises: [], gcsafe, closure.} - PendingBlocksIter* = ref object - finished*: bool - next*: GetNext + EncodingParams = object + ecK: int + ecM: int + rounded: int + steps: int + blocksCount: int -func indexToPos(self: Erasure, encoded: Manifest, idx, step: int): int {.inline.} = +func indexToPos(steps, idx, step: int): int {.inline.} = ## Convert an index to a position in the encoded ## dataset ## `idx` - the index to convert @@ -77,93 +85,71 @@ func indexToPos(self: Erasure, encoded: Manifest, idx, step: int): int {.inline. ## `pos` - the position in the encoded dataset ## - (idx - step) div encoded.steps - -iterator items*(blocks: PendingBlocksIter): Future[?(bt.Block, int)] = - while not blocks.finished: - yield blocks.next() + (idx - step) div steps proc getPendingBlocks( self: Erasure, manifest: Manifest, - start, stop, steps: int): ?!PendingBlocksIter = + indicies: seq[int]): AsyncIter[(?!bt.Block, int)] = ## Get pending blocks iterator ## var - # calculate block indexes to retrieve - blockIdx = toSeq(countup(start, stop, steps)) - # request all blocks from the store - pendingBlocks = blockIdx.mapIt( - self.store.getBlock(manifest[it]) # Get the data blocks (first K) + # request blocks from the store + pendingBlocks = indicies.map( (i: int) => + self.store.getBlock(BlockAddress.init(manifest.treeCid, i)).map((r: ?!bt.Block) => (r, i)) # Get the data blocks (first K) ) - indices = pendingBlocks # needed so we can track the block indices - iter = PendingBlocksIter(finished: false) - trace "Requesting blocks", pendingBlocks = pendingBlocks.len - proc next(): Future[?(bt.Block, int)] {.async.} = - if iter.finished: - trace "No more blocks" - return none (bt.Block, int) + proc isFinished(): bool = pendingBlocks.len == 0 - if pendingBlocks.len == 0: - iter.finished = true - trace "No more blocks - finished" - return none (bt.Block, int) - - let - done = await one(pendingBlocks) - idx = indices.find(done) - - logScope: - idx = idx - blockIdx = blockIdx[idx] - manifest = manifest[blockIdx[idx]] - - pendingBlocks.del(pendingBlocks.find(done)) - without blk =? (await done), error: - trace "Failed retrieving block", err = $error.msg - return none (bt.Block, int) - - trace "Retrieved block" - some (blk, blockIdx[idx]) - - iter.next = next - success iter + proc genNext(): Future[(?!bt.Block, int)] {.async.} = + let completedFut = await one(pendingBlocks) + if (let i = pendingBlocks.find(completedFut); i >= 0): + pendingBlocks.del(i) + return await completedFut + else: + let (_, index) = await completedFut + raise newException(CatchableError, "Future for block id not found, tree cid: " & $manifest.treeCid & ", index: " & $index) + Iter.new(genNext, isFinished) + proc prepareEncodingData( self: Erasure, - encoded: Manifest, + manifest: Manifest, + params: EncodingParams, step: int, data: ref seq[seq[byte]], + cids: ref seq[Cid], emptyBlock: seq[byte]): Future[?!int] {.async.} = ## Prepare data for encoding ## - without pendingBlocksIter =? - self.getPendingBlocks( - encoded, - step, - encoded.rounded - 1, encoded.steps), err: - trace "Unable to get pending blocks", error = err.msg - return failure(err) + let + indicies = toSeq(countup(step, params.rounded - 1, params.steps)) + pendingBlocksIter = self.getPendingBlocks(manifest, indicies.filterIt(it < manifest.blocksCount)) var resolved = 0 - for blkFut in pendingBlocksIter: - if (blk, idx) =? (await blkFut): - let - pos = self.indexToPos(encoded, idx, step) + for fut in pendingBlocksIter: + let (blkOrErr, idx) = await fut + without blk =? blkOrErr, err: + warn "Failed retreiving a block", treeCid = manifest.treeCid, idx, msg = err.msg + continue + + let pos = indexToPos(params.steps, idx, step) + shallowCopy(data[pos], if blk.isEmpty: emptyBlock else: blk.data) + cids[idx] = blk.cid - if blk.isEmpty: - trace "Padding with empty block", idx - shallowCopy(data[pos], emptyBlock) - else: - trace "Encoding block", cid = blk.cid, idx - shallowCopy(data[pos], blk.data) + resolved.inc() - resolved.inc() + for idx in indicies.filterIt(it >= manifest.blocksCount): + let pos = indexToPos(params.steps, idx, step) + trace "Padding with empty block", idx + shallowCopy(data[pos], emptyBlock) + without emptyBlockCid =? emptyCid(manifest.version, manifest.hcodec, manifest.codec), err: + return failure(err) + cids[idx] = emptyBlockCid - success resolved + success(resolved) proc prepareDecodingData( self: Erasure, @@ -171,129 +157,116 @@ proc prepareDecodingData( step: int, data: ref seq[seq[byte]], parityData: ref seq[seq[byte]], + cids: ref seq[Cid], emptyBlock: seq[byte]): Future[?!(int, int)] {.async.} = ## Prepare data for decoding ## `encoded` - the encoded manifest ## `step` - the current step ## `data` - the data to be prepared ## `parityData` - the parityData to be prepared + ## `cids` - cids of prepared data ## `emptyBlock` - the empty block to be used for padding ## - without pendingBlocksIter =? - self.getPendingBlocks( - encoded, - step, - encoded.len - 1, encoded.steps), err: - trace "Unable to get pending blocks", error = err.msg - return failure(err) + let + indicies = toSeq(countup(step, encoded.blocksCount - 1, encoded.steps)) + pendingBlocksIter = self.getPendingBlocks(encoded, indicies) var dataPieces = 0 parityPieces = 0 resolved = 0 - for blkFut in pendingBlocksIter: + for fut in pendingBlocksIter: # Continue to receive blocks until we have just enough for decoding # or no more blocks can arrive if resolved >= encoded.ecK: break - if (blk, idx) =? (await blkFut): - let - pos = self.indexToPos(encoded, idx, step) + let (blkOrErr, idx) = await fut + without blk =? blkOrErr, err: + trace "Failed retreiving a block", idx, treeCid = encoded.treeCid, msg = err.msg + continue - logScope: - cid = blk.cid - idx = idx - pos = pos - step = step - empty = blk.isEmpty + let + pos = indexToPos(encoded.steps, idx, step) - if idx >= encoded.rounded: - trace "Retrieved parity block" - shallowCopy(parityData[pos - encoded.ecK], if blk.isEmpty: emptyBlock else: blk.data) - parityPieces.inc - else: - trace "Retrieved data block" - shallowCopy(data[pos], if blk.isEmpty: emptyBlock else: blk.data) - dataPieces.inc + logScope: + cid = blk.cid + idx = idx + pos = pos + step = step + empty = blk.isEmpty - resolved.inc + cids[idx] = blk.cid + if idx >= encoded.rounded: + trace "Retrieved parity block" + shallowCopy(parityData[pos - encoded.ecK], if blk.isEmpty: emptyBlock else: blk.data) + parityPieces.inc + else: + trace "Retrieved data block" + shallowCopy(data[pos], if blk.isEmpty: emptyBlock else: blk.data) + dataPieces.inc + + resolved.inc return success (dataPieces, parityPieces) -proc prepareManifest( - self: Erasure, - manifest: Manifest, - blocks: int, - parity: int): ?!Manifest = +proc init(_: type EncodingParams, manifest: Manifest, ecK: int, ecM: int): ?!EncodingParams = + if ecK > manifest.blocksCount: + return failure("Unable to encode manifest, not enough blocks, ecK = " & $ecK & ", blocksCount = " & $manifest.blocksCount) - logScope: - original_cid = manifest.cid.get() - original_len = manifest.len - blocks = blocks - parity = parity + let + rounded = roundUp(manifest.blocksCount, ecK) + steps = divUp(manifest.blocksCount, ecK) + blocksCount = rounded + (steps * ecM) - if blocks > manifest.len: - trace "Unable to encode manifest, not enough blocks", blocks = blocks, len = manifest.len - return failure("Not enough blocks to encode") - - trace "Preparing erasure coded manifest", blocks, parity - without var encoded =? Manifest.new(manifest, blocks, parity), error: - trace "Unable to create manifest", msg = error.msg - return error.failure - - logScope: - steps = encoded.steps - rounded_blocks = encoded.rounded - new_manifest = encoded.len - - trace "Erasure coded manifest prepared" - - success encoded + EncodingParams( + ecK: ecK, + ecM: ecM, + rounded: rounded, + steps: steps, + blocksCount: blocksCount + ).success proc encodeData( self: Erasure, - manifest: Manifest): Future[?!void] {.async.} = + manifest: Manifest, + params: EncodingParams + ): Future[?!Manifest] {.async.} = ## Encode blocks pointed to by the protected manifest ## ## `manifest` - the manifest to encode ## - var - encoded = manifest - logScope: - steps = encoded.steps - rounded_blocks = encoded.rounded - new_manifest = encoded.len - protected = encoded.protected - ecK = encoded.ecK - ecM = encoded.ecM - - if not encoded.protected: - trace "Manifest is not erasure protected" - return failure("Manifest is not erasure protected") + steps = params.steps + rounded_blocks = params.rounded + blocks_count = params.blocksCount + ecK = params.ecK + ecM = params.ecM var - encoder = self.encoderProvider(encoded.blockSize.int, encoded.ecK, encoded.ecM) - emptyBlock = newSeq[byte](encoded.blockSize.int) + cids = seq[Cid].new() + encoder = self.encoderProvider(manifest.blockSize.int, params.ecK, params.ecM) + emptyBlock = newSeq[byte](manifest.blockSize.int) + + cids[].setLen(params.blocksCount) try: - for step in 0.. i < tree.leavesCount) + + if err =? (await self.store.putSomeProofs(tree, idxIter)).errorOption: + return failure(err) + + let decoded = Manifest.new(encoded) return decoded.success diff --git a/codex/manifest/coders.nim b/codex/manifest/coders.nim index 0a3c894d..db504617 100644 --- a/codex/manifest/coders.nim +++ b/codex/manifest/coders.nim @@ -34,54 +34,45 @@ proc encode*(_: DagPBCoder, manifest: Manifest): ?!seq[byte] = ? manifest.verify() var pbNode = initProtoBuffer() - for c in manifest.blocks: - var pbLink = initProtoBuffer() - pbLink.write(1, c.data.buffer) # write Cid links - pbLink.finish() - pbNode.write(2, pbLink) - # NOTE: The `Data` field in the the `dag-pb` # contains the following protobuf `Message` # # ```protobuf # Message ErasureInfo { - # optional uint32 K = 1; # number of encoded blocks - # optional uint32 M = 2; # number of parity blocks - # optional bytes cid = 3; # cid of the original dataset - # optional uint32 original = 4; # number of original blocks + # optional uint32 ecK = 1; # number of encoded blocks + # optional uint32 ecM = 2; # number of parity blocks + # optional bytes originalTreeCid = 3; # cid of the original dataset + # optional uint32 originalDatasetSize = 4; # size of the original dataset # } # Message Header { - # optional bytes rootHash = 1; # the root (tree) hash + # optional bytes treeCid = 1; # cid (root) of the tree # optional uint32 blockSize = 2; # size of a single block - # optional uint32 blocksLen = 3; # total amount of blocks + # optional uint64 datasetSize = 3; # size of the dataset # optional ErasureInfo erasure = 4; # erasure coding info - # optional uint64 originalBytes = 5;# exact file size # } # ``` # - - let cid = ? manifest.cid + # var treeRootVBuf = initVBuffer() var header = initProtoBuffer() - header.write(1, cid.data.buffer) + header.write(1, manifest.treeCid.data.buffer) header.write(2, manifest.blockSize.uint32) - header.write(3, manifest.len.uint32) - header.write(5, manifest.originalBytes.uint64) + header.write(3, manifest.datasetSize.uint32) if manifest.protected: var erasureInfo = initProtoBuffer() erasureInfo.write(1, manifest.ecK.uint32) erasureInfo.write(2, manifest.ecM.uint32) - erasureInfo.write(3, manifest.originalCid.data.buffer) - erasureInfo.write(4, manifest.originalLen.uint32) + erasureInfo.write(3, manifest.originalTreeCid.data.buffer) + erasureInfo.write(4, manifest.originalDatasetSize.uint32) erasureInfo.finish() header.write(4, erasureInfo) - pbNode.write(1, header) # set the rootHash Cid as the data field + pbNode.write(1, header) # set the treeCid as the data field pbNode.finish() return pbNode.buffer.success -func decode*(_: DagPBCoder, data: openArray[byte]): ?!Manifest = +proc decode*(_: DagPBCoder, data: openArray[byte]): ?!Manifest = ## Decode a manifest from a data blob ## @@ -89,86 +80,70 @@ func decode*(_: DagPBCoder, data: openArray[byte]): ?!Manifest = pbNode = initProtoBuffer(data) pbHeader: ProtoBuffer pbErasureInfo: ProtoBuffer - rootHash: seq[byte] - originalCid: seq[byte] - originalBytes: uint64 + treeCidBuf: seq[byte] + originalTreeCid: seq[byte] + datasetSize: uint32 blockSize: uint32 - blocksLen: uint32 - originalLen: uint32 + originalDatasetSize: uint32 ecK, ecM: uint32 - blocks: seq[Cid] # Decode `Header` message if pbNode.getField(1, pbHeader).isErr: return failure("Unable to decode `Header` from dag-pb manifest!") # Decode `Header` contents - if pbHeader.getField(1, rootHash).isErr: - return failure("Unable to decode `rootHash` from manifest!") + if pbHeader.getField(1, treeCidBuf).isErr: + return failure("Unable to decode `treeCid` from manifest!") if pbHeader.getField(2, blockSize).isErr: return failure("Unable to decode `blockSize` from manifest!") - if pbHeader.getField(3, blocksLen).isErr: - return failure("Unable to decode `blocksLen` from manifest!") - - if pbHeader.getField(5, originalBytes).isErr: - return failure("Unable to decode `originalBytes` from manifest!") + if pbHeader.getField(3, datasetSize).isErr: + return failure("Unable to decode `datasetSize` from manifest!") if pbHeader.getField(4, pbErasureInfo).isErr: return failure("Unable to decode `erasureInfo` from manifest!") - if pbErasureInfo.buffer.len > 0: + let protected = pbErasureInfo.buffer.len > 0 + if protected: if pbErasureInfo.getField(1, ecK).isErr: return failure("Unable to decode `K` from manifest!") if pbErasureInfo.getField(2, ecM).isErr: return failure("Unable to decode `M` from manifest!") - if pbErasureInfo.getField(3, originalCid).isErr: - return failure("Unable to decode `originalCid` from manifest!") + if pbErasureInfo.getField(3, originalTreeCid).isErr: + return failure("Unable to decode `originalTreeCid` from manifest!") - if pbErasureInfo.getField(4, originalLen).isErr: - return failure("Unable to decode `originalLen` from manifest!") + if pbErasureInfo.getField(4, originalDatasetSize).isErr: + return failure("Unable to decode `originalDatasetSize` from manifest!") - let rootHashCid = ? Cid.init(rootHash).mapFailure - var linksBuf: seq[seq[byte]] - if pbNode.getRepeatedField(2, linksBuf).isOk: - for pbLinkBuf in linksBuf: - var - blockBuf: seq[byte] - pbLink = initProtoBuffer(pbLinkBuf) - if pbLink.getField(1, blockBuf).isOk: - blocks.add(? Cid.init(blockBuf).mapFailure) - - if blocksLen.int != blocks.len: - return failure("Total blocks and length of blocks in header don't match!") + let + treeCid = ? Cid.init(treeCidBuf).mapFailure let - self = if pbErasureInfo.buffer.len > 0: + self = if protected: Manifest.new( - rootHash = rootHashCid, - originalBytes = originalBytes.NBytes, + treeCid = treeCid, + datasetSize = datasetSize.NBytes, blockSize = blockSize.NBytes, - blocks = blocks, - version = rootHashCid.cidver, - hcodec = (? rootHashCid.mhash.mapFailure).mcodec, - codec = rootHashCid.mcodec, + version = treeCid.cidver, + hcodec = (? treeCid.mhash.mapFailure).mcodec, + codec = treeCid.mcodec, ecK = ecK.int, ecM = ecM.int, - originalCid = ? Cid.init(originalCid).mapFailure, - originalLen = originalLen.int + originalTreeCid = ? Cid.init(originalTreeCid).mapFailure, + originalDatasetSize = originalDatasetSize.NBytes ) else: Manifest.new( - rootHash = rootHashCid, - originalBytes = originalBytes.NBytes, + treeCid = treeCid, + datasetSize = datasetSize.NBytes, blockSize = blockSize.NBytes, - blocks = blocks, - version = rootHashCid.cidver, - hcodec = (? rootHashCid.mhash.mapFailure).mcodec, - codec = rootHashCid.mcodec + version = treeCid.cidver, + hcodec = (? treeCid.mhash.mapFailure).mcodec, + codec = treeCid.mcodec ) ? self.verify() diff --git a/codex/manifest/manifest.nim b/codex/manifest/manifest.nim index af622db7..53c37ce3 100644 --- a/codex/manifest/manifest.nim +++ b/codex/manifest/manifest.nim @@ -30,19 +30,18 @@ export types type Manifest* = ref object of RootObj - rootHash {.serialize.}: ?Cid # Root (tree) hash of the contained data set - originalBytes* {.serialize.}: NBytes # Exact size of the original (uploaded) file + treeCid {.serialize.}: Cid # Root of the merkle tree + datasetSize {.serialize.}: NBytes # Total size of all blocks blockSize {.serialize.}: NBytes # Size of each contained block (might not be needed if blocks are len-prefixed) - blocks: seq[Cid] # Block Cid version: CidVersion # Cid version hcodec: MultiCodec # Multihash codec codec: MultiCodec # Data set codec case protected {.serialize.}: bool # Protected datasets have erasure coded info of true: - ecK: int # Number of blocks to encode - ecM: int # Number of resulting parity blocks - originalCid: Cid # The original Cid of the dataset being erasure coded - originalLen: int # The length of the original manifest + ecK: int # Number of blocks to encode + ecM: int # Number of resulting parity blocks + originalTreeCid: Cid # The original root of the dataset being erasure coded + originalDatasetSize: NBytes else: discard @@ -53,8 +52,8 @@ type proc blockSize*(self: Manifest): NBytes = self.blockSize -proc blocks*(self: Manifest): seq[Cid] = - self.blocks +proc datasetSize*(self: Manifest): NBytes = + self.datasetSize proc version*(self: Manifest): CidVersion = self.version @@ -74,33 +73,25 @@ proc ecK*(self: Manifest): int = proc ecM*(self: Manifest): int = self.ecM -proc originalCid*(self: Manifest): Cid = - self.originalCid +proc originalTreeCid*(self: Manifest): Cid = + self.originalTreeCid -proc originalLen*(self: Manifest): int = - self.originalLen +proc originalBlocksCount*(self: Manifest): int = + divUp(self.originalDatasetSize.int, self.blockSize.int) + +proc originalDatasetSize*(self: Manifest): NBytes = + self.originalDatasetSize + +proc treeCid*(self: Manifest): Cid = + self.treeCid + +proc blocksCount*(self: Manifest): int = + divUp(self.datasetSize.int, self.blockSize.int) ############################################################ # Operations on block list ############################################################ -func len*(self: Manifest): int = - self.blocks.len - -func `[]`*(self: Manifest, i: Natural): Cid = - self.blocks[i] - -func `[]=`*(self: var Manifest, i: Natural, item: Cid) = - self.rootHash = Cid.none - self.blocks[i] = item - -func `[]`*(self: Manifest, i: BackwardsIndex): Cid = - self.blocks[self.len - i.int] - -func `[]=`*(self: Manifest, i: BackwardsIndex, item: Cid) = - self.rootHash = Cid.none - self.blocks[self.len - i.int] = item - func isManifest*(cid: Cid): ?!bool = let res = ?cid.contentType().mapFailure(CodexError) ($(res) in ManifestContainers).success @@ -108,25 +99,6 @@ func isManifest*(cid: Cid): ?!bool = func isManifest*(mc: MultiCodec): ?!bool = ($mc in ManifestContainers).success -proc add*(self: Manifest, cid: Cid) = - assert not self.protected # we expect that protected manifests are created with properly-sized self.blocks - self.rootHash = Cid.none - trace "Adding cid to manifest", cid - self.blocks.add(cid) - self.originalBytes = self.blocks.len.NBytes * self.blockSize - -iterator items*(self: Manifest): Cid = - for b in self.blocks: - yield b - -iterator pairs*(self: Manifest): tuple[key: int, val: Cid] = - for pair in self.blocks.pairs(): - yield pair - -func contains*(self: Manifest, cid: Cid): bool = - cid in self.blocks - - ############################################################ # Various sizes and verification ############################################################ @@ -134,79 +106,61 @@ func contains*(self: Manifest, cid: Cid): bool = func bytes*(self: Manifest, pad = true): NBytes = ## Compute how many bytes corresponding StoreStream(Manifest, pad) will return if pad or self.protected: - self.len.NBytes * self.blockSize + self.blocksCount.NBytes * self.blockSize else: - self.originalBytes + self.datasetSize func rounded*(self: Manifest): int = ## Number of data blocks in *protected* manifest including padding at the end - roundUp(self.originalLen, self.ecK) + roundUp(self.originalBlocksCount, self.ecK) func steps*(self: Manifest): int = ## Number of EC groups in *protected* manifest - divUp(self.originalLen, self.ecK) + divUp(self.originalBlocksCount, self.ecK) func verify*(self: Manifest): ?!void = ## Check manifest correctness ## - let originalLen = (if self.protected: self.originalLen else: self.len) - if divUp(self.originalBytes, self.blockSize) != originalLen: - return failure newException(CodexError, "Broken manifest: wrong originalBytes") - - if self.protected and (self.len != self.steps * (self.ecK + self.ecM)): - return failure newException(CodexError, "Broken manifest: wrong originalLen") + if self.protected and (self.blocksCount != self.steps * (self.ecK + self.ecM)): + return failure newException(CodexError, "Broken manifest: wrong originalBlocksCount") return success() +proc cid*(self: Manifest): ?!Cid {.deprecated: "use treeCid instead".} = + self.treeCid.success -############################################################ -# Cid computation -############################################################ - -template hashBytes(mh: MultiHash): seq[byte] = - ## get the hash bytes of a multihash object - ## - - mh.data.buffer[mh.dpos..(mh.dpos + mh.size - 1)] - -proc makeRoot*(self: Manifest): ?!void = - ## Create a tree hash root of the contained - ## block hashes - ## - - var - stack: seq[MultiHash] - - for cid in self: - stack.add(? cid.mhash.mapFailure) - - while stack.len > 1: - let - (b1, b2) = (stack.pop(), stack.pop()) - mh = ? MultiHash.digest( - $self.hcodec, - (b1.hashBytes() & b2.hashBytes())) - .mapFailure - stack.add(mh) - - if stack.len == 1: - let digest = ? EmptyDigests[self.version][self.hcodec].catch - let cid = ? Cid.init(self.version, self.codec, digest).mapFailure - - self.rootHash = cid.some - - success() - -proc cid*(self: Manifest): ?!Cid = - ## Generate a root hash using the treehash algorithm - ## - - if self.rootHash.isNone: - ? self.makeRoot() - - (!self.rootHash).success +proc `==`*(a, b: Manifest): bool = + (a.treeCid == b.treeCid) and + (a.datasetSize == b.datasetSize) and + (a.blockSize == b.blockSize) and + (a.version == b.version) and + (a.hcodec == b.hcodec) and + (a.codec == b.codec) and + (a.protected == b.protected) and + (if a.protected: + (a.ecK == b.ecK) and + (a.ecM == b.ecM) and + (a.originalTreeCid == b.originalTreeCid) and + (a.originalDatasetSize == b.originalDatasetSize) + else: + true) +proc `$`*(self: Manifest): string = + "treeCid: " & $self.treeCid & + ", datasetSize: " & $self.datasetSize & + ", blockSize: " & $self.blockSize & + ", version: " & $self.version & + ", hcodec: " & $self.hcodec & + ", codec: " & $self.codec & + ", protected: " & $self.protected & + (if self.protected: + ", ecK: " & $self.ecK & + ", ecM: " & $self.ecM & + ", originalTreeCid: " & $self.originalTreeCid & + ", originalDatasetSize: " & $self.originalDatasetSize + else: + "") ############################################################ # Constructors @@ -214,67 +168,61 @@ proc cid*(self: Manifest): ?!Cid = proc new*( T: type Manifest, - blocks: openArray[Cid] = [], - protected = false, - version = CIDv1, + treeCid: Cid, + blockSize: NBytes, + datasetSize: NBytes, + version: CidVersion = CIDv1, hcodec = multiCodec("sha2-256"), codec = multiCodec("raw"), - blockSize = DefaultBlockSize -): ?!Manifest = - ## Create a manifest using an array of `Cid`s - ## - - if hcodec notin EmptyDigests[version]: - return failure("Unsupported manifest hash codec!") + protected = false, +): Manifest = T( - blocks: @blocks, + treeCid: treeCid, + blockSize: blockSize, + datasetSize: datasetSize, version: version, codec: codec, hcodec: hcodec, - blockSize: blockSize, - originalBytes: blocks.len.NBytes * blockSize, - protected: protected).success + protected: protected) proc new*( T: type Manifest, manifest: Manifest, + treeCid: Cid, + datasetSize: NBytes, ecK, ecM: int -): ?!Manifest = +): Manifest = ## Create an erasure protected dataset from an - ## un-protected one + ## unprotected one ## + Manifest( + treeCid: treeCid, + datasetSize: datasetSize, + version: manifest.version, + codec: manifest.codec, + hcodec: manifest.hcodec, + blockSize: manifest.blockSize, + protected: true, + ecK: ecK, ecM: ecM, + originalTreeCid: manifest.treeCid, + originalDatasetSize: manifest.datasetSize) - var - self = Manifest( - version: manifest.version, - codec: manifest.codec, - hcodec: manifest.hcodec, - originalBytes: manifest.originalBytes, - blockSize: manifest.blockSize, - protected: true, - ecK: ecK, ecM: ecM, - originalCid: ? manifest.cid, - originalLen: manifest.len) - - let - encodedLen = self.rounded + (self.steps * ecM) - - self.blocks = newSeq[Cid](encodedLen) - - # copy original manifest blocks - for i in 0.. dst.len: + return failure("Not enough space in a destination buffer") + dst[dstPos..= self.leavesCount or index < 0: - return failure("Index " & $index & " out of range [0.." & $self.leaves.high & "]" ) - - var path = newSeq[MerkleHash](self.height - 1) - for level in 0.. newException(CatchableError, "Error calculating hash using codec " & $mcodec & ": " & $c) - ) + let levels = computeLevels(leavesCount) + let totalNodes = levels[^1].offset + 1 + + var tree = MerkleTree(mcodec: mcodec, digestSize: digestSize, leavesCount: leavesCount, nodesBuffer: newSeq[byte](totalNodes * digestSize)) # copy leaves - for i in 0.. self.nodeBufferToMultiHash(i)) + +proc mcodec*(self: (MerkleTree | MerkleProof)): MultiCodec = + self.mcodec + +proc digestSize*(self: (MerkleTree | MerkleProof)): Natural = + self.digestSize + +proc root*(self: MerkleTree): MultiHash = + let rootIndex = self.len - 1 + self.nodeBufferToMultiHash(rootIndex) + +proc rootCid*(self: MerkleTree, version = CIDv1, dataCodec = multiCodec("raw")): ?!Cid = + Cid.init(version, dataCodec, self.root).mapFailure + +iterator leaves*(self: MerkleTree): MultiHash = + for i in 0..= self.leavesCount: + return failure("Index " & $index & " out of range [0.." & $(self.leavesCount - 1) & "]" ) + + success(self.nodeBufferToMultiHash(index)) + +proc getLeafCid*(self: MerkleTree, index: Natural, version = CIDv1, dataCodec = multiCodec("raw")): ?!Cid = + let leaf = ? self.getLeaf(index) + Cid.init(version, dataCodec, leaf).mapFailure + +proc height*(self: MerkleTree): Natural = + computeTreeHeight(self.leavesCount) + +proc getProof*(self: MerkleTree, index: Natural): ?!MerkleProof = + ## Extracts proof from a tree for a given index + ## + ## Given a tree built from data blocks A, B and C + ## H5 + ## / \ + ## H3 H4 + ## / \ / + ## H0 H1 H2 + ## | | | + ## A B C + ## + ## Proofs of inclusion (index and path) are + ## - 0,[H1, H4] for data block A + ## - 1,[H0, H4] for data block B + ## - 2,[0x00, H3] for data block C + ## + if index >= self.leavesCount: + return failure("Index " & $index & " out of range [0.." & $(self.leavesCount - 1) & "]" ) + + var zero = newSeq[byte](self.digestSize) + var one = newSeq[byte](self.digestSize) + one[^1] = 0x01 + + let levels = computeLevels(self.leavesCount) + var proofNodesBuffer = newSeq[byte]((levels.len - 1) * self.digestSize) + for level in levels[0..^2]: + let lr = index shr level.index + let siblingIndex = if lr mod 2 == 0: + level.offset + lr + 1 + else: + level.offset + lr - 1 + + var dummyValue = if level.index == 0: zero else: one + + if siblingIndex < level.offset + level.width: + proofNodesBuffer[level.index * self.digestSize..<(level.index + 1) * self.digestSize] = + self.nodesBuffer[siblingIndex * self.digestSize..<(siblingIndex + 1) * self.digestSize] + else: + proofNodesBuffer[level.index * self.digestSize..<(level.index + 1) * self.digestSize] = dummyValue + + success(MerkleProof(mcodec: self.mcodec, digestSize: self.digestSize, index: index, nodesBuffer: proofNodesBuffer)) + +proc `$`*(self: MerkleTree): string {.noSideEffect.} = + "mcodec:" & $self.mcodec & + ", digestSize: " & $self.digestSize & + ", leavesCount: " & $self.leavesCount & + ", nodes: " & $self.nodes + +proc `==`*(a, b: MerkleTree): bool = + (a.mcodec == b.mcodec) and + (a.digestSize == b.digestSize) and + (a.leavesCount == b.leavesCount) and + (a.nodesBuffer == b.nodesBuffer) proc init*( T: type MerkleTree, - leaves: openArray[MerkleHash] + mcodec: MultiCodec, + digestSize: Natural, + leavesCount: Natural, + nodesBuffer: seq[byte] ): ?!MerkleTree = - initTreeFromLeaves(leaves) + let levels = computeLevels(leavesCount) + let totalNodes = levels[^1].offset + 1 + if totalNodes * digestSize == nodesBuffer.len: + success( + MerkleTree( + mcodec: mcodec, + digestSize: digestSize, + leavesCount: leavesCount, + nodesBuffer: nodesBuffer + ) + ) + else: + failure("Expected nodesBuffer len to be " & $(totalNodes * digestSize) & " but was " & $nodesBuffer.len) -proc index*(self: MerkleProof): int = +proc init*( + T: type MerkleTree, + leaves: openArray[MultiHash] +): ?!MerkleTree = + without leaf =? leaves.?[0]: + return failure("At least one leaf is required") + + var builder = ? MerkleTreeBuilder.init(mcodec = leaf.mcodec) + + for l in leaves: + let res = builder.addLeaf(l) + if res.isErr: + return failure(res.error) + + builder.build() + +proc init*( + T: type MerkleTree, + cids: openArray[Cid] +): ?!MerkleTree = + var leaves = newSeq[MultiHash]() + + for cid in cids: + let res = cid.mhash.mapFailure + if res.isErr: + return failure(res.error) + else: + leaves.add(res.value) + + MerkleTree.init(leaves) + +########################################################### +# MerkleProof +########################################################### + +proc verifyLeaf*(self: MerkleProof, leaf: MultiHash, treeRoot: MultiHash): ?!bool = + if leaf.mcodec != self.mcodec: + return failure("Leaf mcodec was " & $leaf.mcodec & ", but " & $self.mcodec & " expected") + + if leaf.mcodec != self.mcodec: + return failure("Tree root mcodec was " & $treeRoot.mcodec & ", but " & $treeRoot.mcodec & " expected") + + var digestBuf = newSeq[byte](self.digestSize) + digestBuf[0..^1] = leaf.data.buffer[leaf.dpos..<(leaf.dpos + self.digestSize)] + + let proofLen = self.nodesBuffer.len div self.digestSize + var concatBuf = newSeq[byte](2 * self.digestSize) + for i in 0.. node.blockStore.getBlock(BlockAddress.init(manifest.treeCid, i))) + + for batchNum in 0.. 0): trace "Got data from stream", len = chunk.len - without blk =? bt.Block.new(chunk): - return failure("Unable to init block from chunk!") - blockManifest.add(blk.cid) + without mhash =? MultiHash.digest($hcodec, chunk).mapFailure, err: + return failure(err) + + without cid =? Cid.init(CIDv1, dataCodec, mhash).mapFailure, err: + return failure(err) + + without blk =? bt.Block.new(cid, chunk, verify = false): + return failure("Unable to init block from chunk!") + + cids.add(cid) + if err =? (await self.blockStore.putBlock(blk)).errorOption: trace "Unable to store block", cid = blk.cid, err = err.msg return failure(&"Unable to store block {blk.cid}") @@ -208,34 +223,51 @@ proc store*( finally: await stream.close() + without tree =? MerkleTree.init(cids), err: + return failure(err) + + without treeCid =? tree.rootCid(CIDv1, dataCodec), err: + return failure(err) + + for index, cid in cids: + without proof =? tree.getProof(index), err: + return failure(err) + if err =? (await self.blockStore.putBlockCidAndProof(treeCid, index, cid, proof)).errorOption: + # TODO add log here + return failure(err) + + let manifest = Manifest.new( + treeCid = treeCid, + blockSize = blockSize, + datasetSize = NBytes(chunker.offset), + version = CIDv1, + hcodec = hcodec, + codec = dataCodec + ) # Generate manifest - blockManifest.originalBytes = NBytes(chunker.offset) # store the exact file size - without data =? blockManifest.encode(): + without data =? manifest.encode(), err: return failure( - newException(CodexError, "Could not generate dataset manifest!")) + newException(CodexError, "Error encoding manifest: " & err.msg)) # Store as a dag-pb block - without manifest =? bt.Block.new(data = data, codec = DagPBCodec): + without manifestBlk =? bt.Block.new(data = data, codec = DagPBCodec): trace "Unable to init block from manifest data!" return failure("Unable to init block from manifest data!") - if isErr (await self.blockStore.putBlock(manifest)): - trace "Unable to store manifest", cid = manifest.cid - return failure("Unable to store manifest " & $manifest.cid) + if isErr (await self.blockStore.putBlock(manifestBlk)): + trace "Unable to store manifest", cid = manifestBlk.cid + return failure("Unable to store manifest " & $manifestBlk.cid) - without cid =? blockManifest.cid, error: - trace "Unable to generate manifest Cid!", exc = error.msg - return failure(error.msg) - - trace "Stored data", manifestCid = manifest.cid, - contentCid = cid, - blocks = blockManifest.len, - size=blockManifest.originalBytes + info "Stored data", manifestCid = manifestBlk.cid, + treeCid = treeCid, + blocks = manifest.blocksCount, + datasetSize = manifest.datasetSize # Announce manifest - await self.discovery.provide(manifest.cid) + await self.discovery.provide(manifestBlk.cid) + await self.discovery.provide(treeCid) - return manifest.cid.success + return manifestBlk.cid.success proc iterateManifests*(node: CodexNodeRef, onManifest: OnManifest) {.async.} = without cids =? await node.blockStore.listBlocks(BlockType.Manifest): @@ -309,7 +341,7 @@ proc requestStorage*( # because the slotSize is used to determine the amount of bytes to reserve # in a Reservations # TODO: slotSize: (encoded.blockSize.int * encoded.steps).u256, - slotSize: (encoded.blockSize.int * encoded.blocks.len).u256, + slotSize: (encoded.blockSize.int * encoded.blocksCount).u256, duration: duration, proofProbability: proofProbability, reward: reward, @@ -319,7 +351,7 @@ proc requestStorage*( content: StorageContent( cid: $encodedBlk.cid, erasure: StorageErasure( - totalChunks: encoded.len.uint64, + totalChunks: encoded.blocksCount.uint64, ), por: StoragePoR( u: @[], # TODO: PoR setup diff --git a/codex/stores.nim b/codex/stores.nim index 48a0df79..5937e9ac 100644 --- a/codex/stores.nim +++ b/codex/stores.nim @@ -4,5 +4,6 @@ import ./stores/networkstore import ./stores/repostore import ./stores/maintenance import ./stores/keyutils +import ./stores/treehelper -export cachestore, blockstore, networkstore, repostore, maintenance, keyutils +export cachestore, blockstore, networkstore, repostore, maintenance, keyutils, treehelper diff --git a/codex/stores/blockstore.nim b/codex/stores/blockstore.nim index 49f97cb2..653df88e 100644 --- a/codex/stores/blockstore.nim +++ b/codex/stores/blockstore.nim @@ -18,6 +18,8 @@ import pkg/questionable/results import ../clock import ../blocktype +import ../merkletree +import ../utils export blocktype @@ -27,23 +29,31 @@ type BlockType* {.pure.} = enum Manifest, Block, Both - GetNext* = proc(): Future[?Cid] {.upraises: [], gcsafe, closure.} - - BlocksIter* = ref object - finished*: bool - next*: GetNext - BlockStore* = ref object of RootObj -iterator items*(self: BlocksIter): Future[?Cid] = - while not self.finished: - yield self.next() - method getBlock*(self: BlockStore, cid: Cid): Future[?!Block] {.base.} = ## Get a block from the blockstore ## - raiseAssert("Not implemented!") + raiseAssert("getBlock by cid not implemented!") + +method getBlock*(self: BlockStore, treeCid: Cid, index: Natural): Future[?!Block] {.base.} = + ## Get a block from the blockstore + ## + + raiseAssert("getBlock by treecid not implemented!") + +method getBlock*(self: BlockStore, address: BlockAddress): Future[?!Block] {.base.} = + ## Get a block from the blockstore + ## + + raiseAssert("getBlock by addr not implemented!") + +method getBlockAndProof*(self: BlockStore, treeCid: Cid, index: Natural): Future[?!(Block, MerkleProof)] {.base.} = + ## Get a block and associated inclusion proof by Cid of a merkle tree and an index of a leaf in a tree + ## + + raiseAssert("getBlockAndProof not implemented!") method putBlock*( self: BlockStore, @@ -53,7 +63,19 @@ method putBlock*( ## Put a block to the blockstore ## - raiseAssert("Not implemented!") + raiseAssert("putBlock not implemented!") + +method putBlockCidAndProof*( + self: BlockStore, + treeCid: Cid, + index: Natural, + blockCid: Cid, + proof: MerkleProof +): Future[?!void] {.base.} = + ## Put a block to the blockstore + ## + + raiseAssert("putBlockCidAndProof not implemented!") method ensureExpiry*( self: BlockStore, @@ -70,28 +92,40 @@ method delBlock*(self: BlockStore, cid: Cid): Future[?!void] {.base.} = ## Delete a block from the blockstore ## - raiseAssert("Not implemented!") + raiseAssert("delBlock not implemented!") + +method delBlock*(self: BlockStore, treeCid: Cid, index: Natural): Future[?!void] {.base.} = + ## Delete a block from the blockstore + ## + + raiseAssert("delBlock not implemented!") method hasBlock*(self: BlockStore, cid: Cid): Future[?!bool] {.base.} = ## Check if the block exists in the blockstore ## - raiseAssert("Not implemented!") + raiseAssert("hasBlock not implemented!") + +method hasBlock*(self: BlockStore, tree: Cid, index: Natural): Future[?!bool] {.base.} = + ## Check if the block exists in the blockstore + ## + + raiseAssert("hasBlock not implemented!") method listBlocks*( self: BlockStore, - blockType = BlockType.Manifest): Future[?!BlocksIter] {.base.} = + blockType = BlockType.Manifest): Future[?!AsyncIter[?Cid]] {.base.} = ## Get the list of blocks in the BlockStore. This is an intensive operation ## - raiseAssert("Not implemented!") + raiseAssert("listBlocks not implemented!") method close*(self: BlockStore): Future[void] {.base.} = ## Close the blockstore, cleaning up resources managed by it. ## For some implementations this may be a no-op ## - raiseAssert("Not implemented!") + raiseAssert("close not implemented!") proc contains*(self: BlockStore, blk: Cid): Future[bool] {.async.} = ## Check if the block exists in the blockstore. @@ -99,3 +133,9 @@ proc contains*(self: BlockStore, blk: Cid): Future[bool] {.async.} = ## return (await self.hasBlock(blk)) |? false + +proc contains*(self: BlockStore, address: BlockAddress): Future[bool] {.async.} = + return if address.leaf: + (await self.hasBlock(address.treeCid, address.index)) |? false + else: + (await self.hasBlock(address.cid)) |? false diff --git a/codex/stores/cachestore.nim b/codex/stores/cachestore.nim index 0a5acbc8..7ae9fa09 100644 --- a/codex/stores/cachestore.nim +++ b/codex/stores/cachestore.nim @@ -25,6 +25,8 @@ import ../units import ../chunker import ../errors import ../manifest +import ../merkletree +import ../utils import ../clock export blockstore @@ -37,6 +39,7 @@ type currentSize*: NBytes size*: NBytes cache: LruCache[Cid, Block] + cidAndProofCache: LruCache[(Cid, Natural), (Cid, MerkleProof)] InvalidBlockSize* = object of CodexError @@ -51,10 +54,10 @@ method getBlock*(self: CacheStore, cid: Cid): Future[?!Block] {.async.} = if cid.isEmpty: trace "Empty block, ignoring" - return success cid.emptyBlock + return cid.emptyBlock if cid notin self.cache: - return failure (ref BlockNotFoundError)(msg: "Block not in cache") + return failure (ref BlockNotFoundError)(msg: "Block not in cache " & $cid) try: return success self.cache[cid] @@ -62,6 +65,35 @@ method getBlock*(self: CacheStore, cid: Cid): Future[?!Block] {.async.} = trace "Error requesting block from cache", cid, error = exc.msg return failure exc +proc getCidAndProof(self: CacheStore, treeCid: Cid, index: Natural): ?!(Cid, MerkleProof) = + if cidAndProof =? self.cidAndProofCache.getOption((treeCid, index)): + success(cidAndProof) + else: + failure(newException(BlockNotFoundError, "Block not in cache: " & $BlockAddress.init(treeCid, index))) + +method getBlock*(self: CacheStore, treeCid: Cid, index: Natural): Future[?!Block] {.async.} = + without cidAndProof =? self.getCidAndProof(treeCid, index), err: + return failure(err) + + await self.getBlock(cidAndProof[0]) + +method getBlockAndProof*(self: CacheStore, treeCid: Cid, index: Natural): Future[?!(Block, MerkleProof)] {.async.} = + without cidAndProof =? self.getCidAndProof(treeCid, index), err: + return failure(err) + + let (cid, proof) = cidAndProof + + without blk =? await self.getBlock(cid), err: + return failure(err) + + success((blk, proof)) + +method getBlock*(self: CacheStore, address: BlockAddress): Future[?!Block] = + if address.leaf: + self.getBlock(address.treeCid, address.index) + else: + self.getBlock(address.cid) + method hasBlock*(self: CacheStore, cid: Cid): Future[?!bool] {.async.} = ## Check if the block exists in the blockstore ## @@ -73,6 +105,16 @@ method hasBlock*(self: CacheStore, cid: Cid): Future[?!bool] {.async.} = return (cid in self.cache).success +method hasBlock*(self: CacheStore, treeCid: Cid, index: Natural): Future[?!bool] {.async.} = + without cidAndProof =? self.getCidAndProof(treeCid, index), err: + if err of BlockNotFoundError: + return success(false) + else: + return failure(err) + + await self.hasBlock(cidAndProof[0]) + + func cids(self: CacheStore): (iterator: Cid {.gcsafe.}) = return iterator(): Cid = for cid in self.cache.keys: @@ -81,12 +123,12 @@ func cids(self: CacheStore): (iterator: Cid {.gcsafe.}) = method listBlocks*( self: CacheStore, blockType = BlockType.Manifest -): Future[?!BlocksIter] {.async.} = +): Future[?!AsyncIter[?Cid]] {.async.} = ## Get the list of blocks in the BlockStore. This is an intensive operation ## var - iter = BlocksIter() + iter = AsyncIter[?Cid]() let cids = self.cids() @@ -102,7 +144,7 @@ method listBlocks*( cid = cids() if finished(cids): - iter.finished = true + iter.finish return Cid.none without isManifest =? cid.isManifest, err: @@ -168,6 +210,16 @@ method putBlock*( discard self.putBlockSync(blk) return success() +method putBlockCidAndProof*( + self: CacheStore, + treeCid: Cid, + index: Natural, + blockCid: Cid, + proof: MerkleProof +): Future[?!void] {.async.} = + self.cidAndProofCache[(treeCid, index)] = (blockCid, proof) + success() + method ensureExpiry*( self: CacheStore, cid: Cid, @@ -193,6 +245,14 @@ method delBlock*(self: CacheStore, cid: Cid): Future[?!void] {.async.} = return success() +method delBlock*(self: CacheStore, treeCid: Cid, index: Natural): Future[?!void] {.async.} = + let maybeRemoved = self.cidAndProofCache.del((treeCid, index)) + + if removed =? maybeRemoved: + return await self.delBlock(removed[0]) + + return success() + method close*(self: CacheStore): Future[void] {.async.} = ## Close the blockstore, a no-op for this implementation ## @@ -217,8 +277,10 @@ proc new*( currentSize = 0'nb size = int(cacheSize div chunkSize) cache = newLruCache[Cid, Block](size) + cidAndProofCache = newLruCache[(Cid, Natural), (Cid, MerkleProof)](size) store = CacheStore( cache: cache, + cidAndProofCache: cidAndProofCache, currentSize: currentSize, size: cacheSize) diff --git a/codex/stores/keyutils.nim b/codex/stores/keyutils.nim index 4b8507d0..084794a9 100644 --- a/codex/stores/keyutils.nim +++ b/codex/stores/keyutils.nim @@ -10,6 +10,7 @@ import pkg/upraises push: {.upraises: [].} +import std/sugar import pkg/questionable/results import pkg/datastore import pkg/libp2p @@ -23,6 +24,7 @@ const CodexTotalBlocksKey* = Key.init(CodexBlockTotalNamespace).tryGet CodexManifestKey* = Key.init(CodexManifestNamespace).tryGet BlocksTtlKey* = Key.init(CodexBlocksTtlNamespace).tryGet + BlockProofKey* = Key.init(CodexBlockProofNamespace).tryGet QuotaKey* = Key.init(CodexQuotaNamespace).tryGet QuotaUsedKey* = (QuotaKey / "used").tryGet QuotaReservedKey* = (QuotaKey / "reserved").tryGet @@ -42,3 +44,7 @@ proc createBlockExpirationMetadataKey*(cid: Cid): ?!Key = proc createBlockExpirationMetadataQueryKey*(): ?!Key = let queryString = ? (BlocksTtlKey / "*") Key.init(queryString) + +proc createBlockCidAndProofMetadataKey*(treeCid: Cid, index: Natural): ?!Key = + (BlockProofKey / $treeCid).flatMap((k: Key) => k / $index) + \ No newline at end of file diff --git a/codex/stores/maintenance.nim b/codex/stores/maintenance.nim index 4b80b6bb..98b6fab9 100644 --- a/codex/stores/maintenance.nim +++ b/codex/stores/maintenance.nim @@ -17,6 +17,7 @@ import pkg/questionable/results import ./repostore import ../utils/timer +import ../utils/asynciter import ../clock import ../systemclock diff --git a/codex/stores/networkstore.nim b/codex/stores/networkstore.nim index 6f7acd25..16e72b21 100644 --- a/codex/stores/networkstore.nim +++ b/codex/stores/networkstore.nim @@ -11,45 +11,64 @@ import pkg/upraises push: {.upraises: [].} +import std/sugar + import pkg/chronicles import pkg/chronos import pkg/libp2p -import ../blocktype as bt +import ../blocktype import ../utils/asyncheapqueue +import ../utils/asynciter import ../clock import ./blockstore import ../blockexchange +import ../merkletree +import ../blocktype export blockstore, blockexchange, asyncheapqueue logScope: topics = "codex networkstore" +const BlockPrefetchAmount = 5 + type NetworkStore* = ref object of BlockStore engine*: BlockExcEngine # blockexc decision engine localStore*: BlockStore # local block store -method getBlock*(self: NetworkStore, cid: Cid): Future[?!bt.Block] {.async.} = - trace "Getting block from local store or network", cid +method getBlock*(self: NetworkStore, address: BlockAddress): Future[?!Block] {.async.} = + trace "Getting block from local store or network", address - without blk =? await self.localStore.getBlock(cid), error: + without blk =? await self.localStore.getBlock(address), error: if not (error of BlockNotFoundError): return failure error - trace "Block not in local store", cid + trace "Block not in local store", address - without newBlock =? (await self.engine.requestBlock(cid)).catch, error: - trace "Unable to get block from exchange engine", cid + without newBlock =? (await self.engine.requestBlock(address)).catch, error: + trace "Unable to get block from exchange engine", address return failure error return success newBlock return success blk +method getBlock*(self: NetworkStore, cid: Cid): Future[?!Block] = + ## Get a block from the blockstore + ## + + self.getBlock(BlockAddress.init(cid)) + +method getBlock*(self: NetworkStore, treeCid: Cid, index: Natural): Future[?!Block] = + ## Get a block from the blockstore + ## + + self.getBlock(BlockAddress.init(treeCid, index)) + method putBlock*( self: NetworkStore, - blk: bt.Block, + blk: Block, ttl = Duration.none ): Future[?!void] {.async.} = ## Store block locally and notify the network @@ -64,6 +83,15 @@ method putBlock*( await self.engine.resolveBlocks(@[blk]) return success() +method putBlockCidAndProof*( + self: NetworkStore, + treeCid: Cid, + index: Natural, + blockCid: Cid, + proof: MerkleProof +): Future[?!void] = + self.localStore.putBlockCidAndProof(treeCid, index, blockCid, proof) + method ensureExpiry*( self: NetworkStore, cid: Cid, @@ -82,7 +110,7 @@ method ensureExpiry*( method listBlocks*( self: NetworkStore, - blockType = BlockType.Manifest): Future[?!BlocksIter] = + blockType = BlockType.Manifest): Future[?!AsyncIter[?Cid]] = self.localStore.listBlocks(blockType) method delBlock*(self: NetworkStore, cid: Cid): Future[?!void] = diff --git a/codex/stores/repostore.nim b/codex/stores/repostore.nim index 922e955c..731ace05 100644 --- a/codex/stores/repostore.nim +++ b/codex/stores/repostore.nim @@ -12,8 +12,10 @@ import pkg/upraises push: {.upraises: [].} import pkg/chronos +import pkg/chronos/futures import pkg/chronicles -import pkg/libp2p/cid +import pkg/libp2p/[cid, multicodec, multihash] +import pkg/lrucache import pkg/metrics import pkg/questionable import pkg/questionable/results @@ -25,6 +27,8 @@ import ./keyutils import ../blocktype import ../clock import ../systemclock +import ../merkletree +import ../utils export blocktype, cid @@ -58,16 +62,7 @@ type BlockExpiration* = object cid*: Cid expiration*: SecondsSince1970 - - GetNext = proc(): Future[?BlockExpiration] {.upraises: [], gcsafe, closure.} - BlockExpirationIter* = ref object - finished*: bool - next*: GetNext - -iterator items*(q: BlockExpirationIter): Future[?BlockExpiration] = - while not q.finished: - yield q.next() - + proc updateMetrics(self: RepoStore) = codex_repostore_blocks.set(self.totalBlocks.int64) codex_repostore_bytes_used.set(self.quotaUsedBytes.int64) @@ -82,6 +77,63 @@ func available*(self: RepoStore): uint = func available*(self: RepoStore, bytes: uint): bool = return bytes < self.available() +proc encode(cidAndProof: (Cid, MerkleProof)): seq[byte] = + ## Encodes a tuple of cid and merkle proof in a following format: + ## | 8-bytes | n-bytes | remaining bytes | + ## | n | cid | proof | + ## + ## where n is a size of cid + ## + let + (cid, proof) = cidAndProof + cidBytes = cid.data.buffer + proofBytes = proof.encode + n = cidBytes.len + nBytes = n.uint64.toBytesBE + + @nBytes & cidBytes & proofBytes + +proc decode(_: type (Cid, MerkleProof), data: seq[byte]): ?!(Cid, MerkleProof) = + let + n = uint64.fromBytesBE(data[0.. i.ord)) + +proc putAllProofs*(store: BlockStore, tree: MerkleTree): Future[?!void] = + store.putSomeProofs(tree, Iter.fromSlice(0.. items[i]) + +proc fromSlice*[U, V: Ordinal](_: type Iter, slice: HSlice[U, V]): Iter[U] = + ## Creates new iterator from slice + ## + + Iter.fromRange(slice.a.int, slice.b.int, 1) + +proc fromRange*[U, V, S: Ordinal](_: type Iter, a: U, b: V, step: S = 1): Iter[U] = + ## Creates new iterator in range a..b with specified step (default 1) + ## + + var i = a + + proc genNext(): U = + let u = i + inc(i, step) + u + + proc isFinished(): bool = + (step > 0 and i > b) or + (step < 0 and i < b) + + Iter.new(genNext, isFinished) + +proc map*[T, U](iter: Iter[T], fn: Function[T, U]): Iter[U] = + Iter.new( + genNext = () => fn(iter.next()), + isFinished = () => iter.finished + ) + +proc filter*[T](iter: Iter[T], predicate: Function[T, bool]): Iter[T] = + var nextT: Option[T] + + proc tryFetch(): void = + nextT = T.none + while not iter.finished: + let t = iter.next() + if predicate(t): + nextT = some(t) + break + + proc genNext(): T = + let t = nextT.unsafeGet + tryFetch() + return t + + proc isFinished(): bool = + nextT.isNone + + tryFetch() + Iter.new(genNext, isFinished) + +proc prefetch*[T](iter: Iter[T], n: Positive): Iter[T] = + var ringBuf = newSeq[T](n) + var iterLen = int.high + var i = 0 + + proc tryFetch(j: int): void = + if not iter.finished: + let item = iter.next() + ringBuf[j mod n] = item + if iter.finished: + iterLen = min(j + 1, iterLen) + else: + if j == 0: + iterLen = 0 + + proc genNext(): T = + let item = ringBuf[i mod n] + tryFetch(i + n) + inc i + return item + + proc isFinished(): bool = + i >= iterLen + + # initialize ringBuf with n prefetched values + for j in 0.. 0): + + let blk = Block.new(chunk).tryGet() + cids.add(blk.cid) + (await store.putBlock(blk)).tryGet() + + let + tree = MerkleTree.init(cids).tryGet() + treeCid = tree.rootCid.tryGet() + manifest = Manifest.new( + treeCid = treeCid, + blockSize = NBytes(chunker.chunkSize), + datasetSize = NBytes(chunker.offset), + ) + + for i in 0..= 0: continue pos.add(i) var - blk = (await store.getBlock(manifest[i])).tryGet() + blk = (await store.getBlock(manifest.treeCid, i)).tryGet() bytePos: seq[int] doAssert bytes < blk.data.len diff --git a/tests/codex/helpers/mockchunker.nim b/tests/codex/helpers/mockchunker.nim new file mode 100644 index 00000000..acbe7ab6 --- /dev/null +++ b/tests/codex/helpers/mockchunker.nim @@ -0,0 +1,45 @@ +import std/sequtils + +import pkg/chronos + +import pkg/codex/chunker +import pkg/codex/rng + +export chunker + +type + MockChunker* = Chunker + +proc new*( + T: type MockChunker, + dataset: openArray[byte], + chunkSize: int | NBytes, + pad: bool = false +): MockChunker = + ## Create a chunker that produces data + ## + + let + chunkSize = chunkSize.NBytes + dataset = @dataset + + var consumed = 0 + proc reader(data: ChunkBuffer, len: int): Future[int] {.async, gcsafe, raises: [Defect].} = + + if consumed >= dataset.len: + return 0 + + var read = 0 + while read < len and + read < chunkSize.int and + (consumed + read) < dataset.len: + data[read] = dataset[consumed + read] + read.inc + + consumed += read + return read + + Chunker.new( + reader = reader, + pad = pad, + chunkSize = chunkSize) diff --git a/tests/codex/helpers/mockrepostore.nim b/tests/codex/helpers/mockrepostore.nim index dd86593e..fa49f878 100644 --- a/tests/codex/helpers/mockrepostore.nim +++ b/tests/codex/helpers/mockrepostore.nim @@ -13,7 +13,8 @@ import pkg/libp2p import pkg/questionable import pkg/questionable/results -import codex/stores/repostore +import pkg/codex/stores/repostore +import pkg/codex/utils/asynciter type MockRepoStore* = ref object of RepoStore @@ -31,15 +32,14 @@ method delBlock*(self: MockRepoStore, cid: Cid): Future[?!void] {.async.} = dec self.iteratorIndex return success() -method getBlockExpirations*(self: MockRepoStore, maxNumber: int, offset: int): Future[?!BlockExpirationIter] {.async.} = +method getBlockExpirations*(self: MockRepoStore, maxNumber: int, offset: int): Future[?!AsyncIter[?BlockExpiration]] {.async.} = if self.getBlockExpirationsThrows: raise new CatchableError self.getBeMaxNumber = maxNumber self.getBeOffset = offset - var iter = BlockExpirationIter() - iter.finished = false + var iter = AsyncIter[?BlockExpiration]() self.iteratorIndex = offset var numberLeft = maxNumber @@ -49,7 +49,7 @@ method getBlockExpirations*(self: MockRepoStore, maxNumber: int, offset: int): F let selectedBlock = self.testBlockExpirations[self.iteratorIndex] inc self.iteratorIndex return selectedBlock.some - iter.finished = true + iter.finish return BlockExpiration.none iter.next = next diff --git a/tests/codex/merkletree/testcoders.nim b/tests/codex/merkletree/testcoders.nim new file mode 100644 index 00000000..72f517bf --- /dev/null +++ b/tests/codex/merkletree/testcoders.nim @@ -0,0 +1,42 @@ +import std/unittest + +import pkg/questionable/results +import pkg/stew/byteutils + +import pkg/codex/merkletree +import ../helpers + +checksuite "merkletree - coders": + const data = + [ + "0123456789012345678901234567890123456789".toBytes, + "1234567890123456789012345678901234567890".toBytes, + "2345678901234567890123456789012345678901".toBytes, + "3456789012345678901234567890123456789012".toBytes, + "4567890123456789012345678901234567890123".toBytes, + "5678901234567890123456789012345678901234".toBytes, + "6789012345678901234567890123456789012345".toBytes, + "7890123456789012345678901234567890123456".toBytes, + "8901234567890123456789012345678901234567".toBytes, + "9012345678901234567890123456789012345678".toBytes, + ] + + test "encoding and decoding a tree yields the same tree": + var builder = MerkleTreeBuilder.init(multiCodec("sha2-256")).tryGet() + builder.addDataBlock(data[0]).tryGet() + builder.addDataBlock(data[1]).tryGet() + builder.addDataBlock(data[2]).tryGet() + builder.addDataBlock(data[3]).tryGet() + builder.addDataBlock(data[4]).tryGet() + builder.addDataBlock(data[5]).tryGet() + builder.addDataBlock(data[6]).tryGet() + builder.addDataBlock(data[7]).tryGet() + builder.addDataBlock(data[8]).tryGet() + builder.addDataBlock(data[9]).tryGet() + + let tree = builder.build().tryGet() + let encodedBytes = tree.encode() + let decodedTree = MerkleTree.decode(encodedBytes).tryGet() + + check: + tree == decodedTree diff --git a/tests/codex/merkletree/testmerkletree.nim b/tests/codex/merkletree/testmerkletree.nim index 18e4ae9a..5b0e2d09 100644 --- a/tests/codex/merkletree/testmerkletree.nim +++ b/tests/codex/merkletree/testmerkletree.nim @@ -1,86 +1,165 @@ import std/unittest -import std/bitops -import std/random import std/sequtils -import pkg/libp2p -import codex/merkletree/merkletree -import ../helpers +import std/tables + import pkg/questionable/results +import pkg/stew/byteutils +import pkg/nimcrypto/sha2 + +import pkg/codex/merkletree +import ../helpers checksuite "merkletree": + const data = + [ + "0123456789012345678901234567890123456789".toBytes, + "1234567890123456789012345678901234567890".toBytes, + "2345678901234567890123456789012345678901".toBytes, + "3456789012345678901234567890123456789012".toBytes, + "4567890123456789012345678901234567890123".toBytes, + "5678901234567890123456789012345678901234".toBytes, + "6789012345678901234567890123456789012345".toBytes, + "7890123456789012345678901234567890123456".toBytes, + "8901234567890123456789012345678901234567".toBytes, + "9012345678901234567890123456789012345678".toBytes, + ] + const sha256 = multiCodec("sha2-256") const sha512 = multiCodec("sha2-512") - proc randomHash(codec: MultiCodec = sha256): MerkleHash = - var data: array[0..31, byte] - for i in 0..31: - data[i] = rand(uint8) - return MultiHash.digest($codec, data).tryGet() - - proc combine(a, b: MerkleHash, codec: MultiCodec = sha256): MerkleHash = + proc combine(a, b: MultiHash, codec: MultiCodec = sha256): MultiHash = var buf = newSeq[byte](a.size + b.size) - for i in 0.. 0): - - let blk = bt.Block.new(chunk).tryGet() - manifest.add(blk.cid) - (await store.putBlock(blk)).tryGet() + manifest = await storeDataGetManifest(store, chunker) proc encode(buffers, parity: int): Future[Manifest] {.async.} = let @@ -50,8 +41,8 @@ asyncchecksuite "Erasure encode/decode": parity)).tryGet() check: - encoded.len mod (buffers + parity) == 0 - encoded.rounded == (manifest.len + (buffers - (manifest.len mod buffers))) + encoded.blocksCount mod (buffers + parity) == 0 + encoded.rounded == (manifest.blocksCount + (buffers - (manifest.blocksCount mod buffers))) encoded.steps == encoded.rounded div buffers return encoded @@ -64,24 +55,25 @@ asyncchecksuite "Erasure encode/decode": let encoded = await encode(buffers, parity) var - column = rng.rand((encoded.len - 1) div encoded.steps) # random column - dropped: seq[Cid] + column = rng.rand((encoded.blocksCount - 1) div encoded.steps) # random column + dropped: seq[int] for _ in 0.. 0 - - let encodedCid = Cid.init(manifest.version, manifest.codec, mh).tryGet() - check: - encodedCid == manifest.cid.tryGet() - test "Should encode/decode to/from manifest": - let - blocks = (0..<1000).mapIt( - Block.new(("Block " & $it).toBytes).tryGet().cid - ) - var - manifest = Manifest.new(blocks).tryGet() + manifest = Manifest.new( + treeCid = Cid.example, + blockSize = 1.MiBs, + datasetSize = 100.MiBs) let e = manifest.encode().tryGet() decoded = Manifest.decode(e).tryGet() check: - decoded.blocks == blocks - decoded.protected == false + decoded == manifest - test "Should produce a protected manifest": - let - blocks = (0..<333).mapIt( - Block.new(("Block " & $it).toBytes).tryGet().cid - ) - manifest = Manifest.new(blocks).tryGet() - - var - protected = Manifest.new(manifest, 2, 2).tryGet() - - check: - protected.originalCid == manifest.cid.tryGet() - protected.blocks[0..<333] == manifest.blocks - protected.protected == true - protected.originalLen == manifest.len - - # fill up with empty Cid's - for i in protected.rounded.. 0): - - let blk = bt.Block.new(chunk).tryGet() - (await localStore.putBlock(blk)).tryGet() - manifest.add(blk.cid) - - return manifest + await storeDataGetManifest(localStore, chunker) proc retrieve(cid: Cid): Future[seq[byte]] {.async.} = # Retrieve an entire file contents by file Cid @@ -113,8 +102,7 @@ asyncchecksuite "Test Node": fetched = (await node.fetchManifest(manifestBlock.cid)).tryGet() check: - fetched.cid == manifest.cid - fetched.blocks == manifest.blocks + fetched == manifest test "Block Batching": let @@ -159,7 +147,7 @@ asyncchecksuite "Test Node": let data = await retrieve(manifestCid) check: - data.len == localManifest.originalBytes.int + data.len == localManifest.datasetSize.int data.len == original.len sha256.digest(data) == sha256.digest(original) diff --git a/tests/codex/teststorestream.nim b/tests/codex/teststorestream.nim index 4226f5eb..362f1467 100644 --- a/tests/codex/teststorestream.nim +++ b/tests/codex/teststorestream.nim @@ -23,32 +23,26 @@ asyncchecksuite "StoreStream": return true let - data = [ - [byte 0, 1, 2, 3, 4, 5, 6, 7, 8, 9], - [byte 10, 11, 12, 13, 14, 15, 16, 17, 18, 19], - [byte 20, 21, 22, 23, 24, 25, 26, 27, 28, 29], - [byte 30, 31, 32, 33, 34, 35, 36, 37, 38, 39], - [byte 40, 41, 42, 43, 44, 45, 46, 47, 48, 49], - [byte 50, 51, 52, 53, 54, 55, 56, 57, 58, 59], - [byte 60, 61, 62, 63, 64, 65, 66, 67, 68, 69], - [byte 70, 71, 72, 73, 74, 75, 76, 77, 78, 79], - [byte 80, 81, 82, 83, 84, 85, 86, 87, 88, 89], - [byte 90, 91, 92, 93, 94, 95, 96, 97, 98, 99], - ] + data = [byte 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, + 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, + 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, + 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, + 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, + 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, + 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, + 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, + 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, + 90, 91, 92, 93, 94, 95, 96, 97, 98, 99] + chunkSize = 10 teardown: await stream.close() setup: store = CacheStore.new() - manifest = Manifest.new(blockSize = 10'nb).tryGet() + manifest = await storeDataGetManifest(store, MockChunker.new(dataset = data, chunkSize = chunkSize)) stream = StoreStream.new(store, manifest) - for d in data: - let blk = bt.Block.new(d).tryGet() - manifest.add(blk.cid) - (await store.putBlock(blk)).tryGet() - test "Read all blocks < blockSize": var buf = newSeq[byte](8)