diff --git a/codex/blockexchange/engine/discovery.nim b/codex/blockexchange/engine/discovery.nim index d93b4cfb..767281b5 100644 --- a/codex/blockexchange/engine/discovery.nim +++ b/codex/blockexchange/engine/discovery.nim @@ -65,7 +65,7 @@ type proc discoveryQueueLoop(b: DiscoveryEngine) {.async.} = while b.discEngineRunning: - for cid in toSeq(b.pendingBlocks.wantList): + for cid in toSeq(b.pendingBlocks.wantListCids): try: await b.discoveryQueue.put(cid) except CatchableError as exc: diff --git a/codex/blockexchange/engine/engine.nim b/codex/blockexchange/engine/engine.nim index f2fc8101..b650a8c0 100644 --- a/codex/blockexchange/engine/engine.nim +++ b/codex/blockexchange/engine/engine.nim @@ -11,16 +11,18 @@ import std/sequtils import std/sets import std/options import std/algorithm +import std/sugar import pkg/chronos import pkg/chronicles -import pkg/libp2p/[cid, switch] +import pkg/libp2p/[cid, switch, multihash] import pkg/metrics import pkg/stint import ../../stores/blockstore -import ../../blocktype as bt +import ../../blocktype import ../../utils +import ../../merkletree import ../protobuf/blockexc import ../protobuf/presence @@ -77,11 +79,14 @@ type address*: EthAddress price*: UInt256 + # BlockIter* = Iter[Block] + +# TODO check usages proc contains*(a: AsyncHeapQueue[Entry], b: Cid): bool = ## Convenience method to check for entry prepense ## - a.anyIt( it.cid == b ) + a.anyIt( not it.address.leaf and it.address.cid == b ) # attach task scheduler to engine proc scheduleTask(b: BlockExcEngine, task: BlockExcPeerCtx): bool {.gcsafe} = @@ -124,22 +129,23 @@ proc stop*(b: BlockExcEngine) {.async.} = trace "NetworkStore stopped" -proc sendWantHave(b: BlockExcEngine, cid: Cid, selectedPeer: BlockExcPeerCtx, peers: seq[BlockExcPeerCtx]): Future[void] {.async.} = - trace "Sending wantHave request to peers", cid + +proc sendWantHave(b: BlockExcEngine, address: BlockAddress, selectedPeer: BlockExcPeerCtx, peers: seq[BlockExcPeerCtx]): Future[void] {.async.} = + trace "Sending wantHave request to peers", address = $address #TODO for p in peers: if p != selectedPeer: - if cid notin p.peerHave: + if address notin p.peerHave: trace " wantHave > ", peer = p.id await b.network.request.sendWantList( p.id, - @[cid], + @[address], wantType = WantType.WantHave) # we only want to know if the peer has the block -proc sendWantBlock(b: BlockExcEngine, cid: Cid, blockPeer: BlockExcPeerCtx): Future[void] {.async.} = - trace "Sending wantBlock request to", peer = blockPeer.id, cid +proc sendWantBlock(b: BlockExcEngine, address: BlockAddress, blockPeer: BlockExcPeerCtx): Future[void] {.async.} = + trace "Sending wantBlock request to", peer = blockPeer.id, address = $address #TODO await b.network.request.sendWantList( blockPeer.id, - @[cid], + @[address], wantType = WantType.WantBlock) # we want this remote to send us a block proc findCheapestPeerForBlock(b: BlockExcEngine, cheapestPeers: seq[BlockExcPeerCtx]): ?BlockExcPeerCtx = @@ -152,10 +158,104 @@ proc findCheapestPeerForBlock(b: BlockExcEngine, cheapestPeers: seq[BlockExcPeer return some(peers[0]) return some(cheapestPeers[0]) # get cheapest + +# TODO think about this one +# proc requestBlock*( +# b: BlockExcEngine, +# treeCid: Cid, +# index: Natural, +# timeout = DefaultBlockTimeout +# ): Future[Block] {.async.} = +# let blockFuture = b.pendingBlocks.getWantHandle(treeCid, index, leavesCount, merkleRoot, timeout) + +# if b.pendingBlocks.isInFlight(treeCid, index): +# return await blockFuture + +# let peers = b.peers.selectCheapest(cid) +# if peers.len == 0: +# b.discovery.queueFindBlocksReq(@[cid]) +# return await blockFuture +# else: +# b.pendingBlocks.setInFlight(treeCid, index) + +# peer = peers[index mod peers.len] # round robin +# await b.sendWantBlock(treeCid, index, peer) +# await b.sendWantHave(treeCid, index, peer, toSeq(b.peers)) + # return await blockFuture +# TODO think about this one + +proc requestBlock( + b: BlockExcEngine, + treeReq: TreeReq, + index: Natural, + timeout = DefaultBlockTimeout +): Future[Block] {.async.} = + let + blockFuture = treeReq.getWantHandle(index, timeout) + address = BlockAddress(leaf: true, treeCid: treeReq.treeCid, index: index) + + if treeReq.isInFlight(index): + return await blockFuture + + let peers = b.peers.selectCheapest(treeReq.treeCid) + if peers.len == 0: + b.discovery.queueFindBlocksReq(@[treeReq.treeCid]) + return await blockFuture + else: + treeReq.setInFlight(index) + let peer = peers[index mod peers.len] # round robin + await b.sendWantBlock(address, peer) + await b.sendWantHave(address, peer, toSeq(b.peers)) + return await blockFuture + +proc requestBlocks*( + b: BlockExcEngine, + treeCid: Cid, + leavesCount: Natural, + merkleRoot: MultiHash, + timeout = DefaultBlockTimeout +): ?!Iter[Block] = + without treeReq =? b.pendingBlocks.getOrPutTreeReq(treeCid, leavesCount, merkleRoot), err: + return failure(err) + + var + iter = Iter[Block]() + index = 0 + + proc next(): Future[Block] = + if index < leavesCount: + inc index + if index >= leavesCount: + iter.finished = true + return b.requestBlock(treeReq, index - 1, timeout) + else: + let fut = newFuture[Block]("chronos.race()") #TODO fixit + fut.fail(newException(CodexError, "No more elements for tree with cid " & $treeCid)) + return fut + + iter.next = next + return success(iter) + + +# iterator requestBlocks*( +# b: BlockExcEngine, +# treeCid: Cid, +# leavesCount: Natural, +# merkleRoot: MultiHash, +# timeout = DefaultBlockTimeout +# ): Future[Block] {.async.} = +# ## +# ## +# without treeReq =? b.pendingBlocks.getOrPutTreeReq(treeCid, leavesCount, merkleRoot), err: +# return err + +# for index in 0.. bool: - not b.peers.anyIt( cid in it.peerHave )) + not b.peers.anyIt( cid in it.peerHaveCids )) -proc scheduleTasks(b: BlockExcEngine, blocks: seq[bt.Block]) {.async.} = - trace "Schedule a task for new blocks", items = blocks.len +proc scheduleTasks(b: BlockExcEngine, blocksDelivery: seq[BlockDelivery]) {.async.} = + trace "Schedule a task for new blocks", items = blocksDelivery.len let - cids = blocks.mapIt( it.cid ) + cids = blocksDelivery.mapIt( it.blk.cid ) # schedule any new peers to provide blocks to for p in b.peers: for c in cids: # for each cid # schedule a peer if it wants at least one cid # and we have it in our local store - if c in p.peerWants: - if await (c in b.localStore): + if c in p.peerWantsCids: + if await (c in b.localStore): # TODO this is cruical, though indirect if b.scheduleTask(p): trace "Task scheduled for peer", peer = p.id else: @@ -279,50 +381,54 @@ proc scheduleTasks(b: BlockExcEngine, blocks: seq[bt.Block]) {.async.} = break # do next peer -proc resolveBlocks*(b: BlockExcEngine, blocks: seq[bt.Block]) {.async.} = - trace "Resolving blocks", blocks = blocks.len +proc resolveBlocks*(b: BlockExcEngine, blocksDelivery: seq[BlockDelivery]) {.async.} = + trace "Resolving blocks", blocks = blocksDelivery.len - b.pendingBlocks.resolve(blocks) - await b.scheduleTasks(blocks) - b.discovery.queueProvideBlocksReq(blocks.mapIt( it.cid )) + b.pendingBlocks.resolve(blocksDelivery) + await b.scheduleTasks(blocksDelivery) + b.discovery.queueProvideBlocksReq(blocksDelivery.mapIt( it.blk.cid )) + +proc resolveBlocks*(b: BlockExcEngine, blocks: seq[Block]) {.async.} = + await b.resolveBlocks(blocks.mapIt(BlockDelivery(blk: it, address: BlockAddress(leaf: false, cid: it.cid)))) proc payForBlocks(engine: BlockExcEngine, peer: BlockExcPeerCtx, - blocks: seq[bt.Block]) {.async.} = - trace "Paying for blocks", blocks = blocks.len + blocksDelivery: seq[BlockDelivery]) {.async.} = + trace "Paying for blocks", len = blocksDelivery.len let sendPayment = engine.network.request.sendPayment - price = peer.price(blocks.mapIt(it.cid)) + price = peer.price(blocksDelivery.mapIt(it.address)) if payment =? engine.wallet.pay(peer, price): trace "Sending payment for blocks", price await sendPayment(peer.id, payment) -proc blocksHandler*( +proc blocksDeliveryHandler*( b: BlockExcEngine, peer: PeerId, - blocks: seq[bt.Block]) {.async.} = - trace "Got blocks from peer", peer, len = blocks.len - for blk in blocks: - if isErr (await b.localStore.putBlock(blk)): - trace "Unable to store block", cid = blk.cid + blocksDelivery: seq[BlockDelivery]) {.async.} = + trace "Got blocks from peer", peer, len = blocksDelivery.len - await b.resolveBlocks(blocks) - codexBlockExchangeBlocksReceived.inc(blocks.len.int64) + for bd in blocksDelivery: + if isErr (await b.localStore.putBlock(bd.blk)): + trace "Unable to store block", cid = bd.blk.cid + + await b.resolveBlocks(blocksDelivery) + codexBlockExchangeBlocksReceived.inc(blocksDelivery.len.int64) let peerCtx = b.peers.get(peer) if peerCtx != nil: - await b.payForBlocks(peerCtx, blocks) + await b.payForBlocks(peerCtx, blocksDelivery) ## shouldn't we remove them from the want-list instead of this: - peerCtx.cleanPresence(blocks.mapIt( it.cid )) + peerCtx.cleanPresence(blocksDelivery.mapIt( it.address )) proc wantListHandler*( b: BlockExcEngine, peer: PeerId, - wantList: Wantlist) {.async.} = + wantList: WantList) {.async.} = trace "Got wantList for peer", peer, items = wantList.entries.len let peerCtx = b.peers.get(peer) @@ -338,14 +444,14 @@ proc wantListHandler*( logScope: peer = peerCtx.id - cid = e.cid + # cid = e.cid wantType = $e.wantType if idx < 0: # updating entry - trace "Processing new want list entry", cid = e.cid + trace "Processing new want list entry", address = $e.address let - have = await e.cid in b.localStore + have = await e.address in b.localStore price = @( b.pricing.get(Pricing(price: 0.u256)) .price.toBytesBE) @@ -354,21 +460,21 @@ proc wantListHandler*( codexBlockExchangeWantHaveListsReceived.inc() if not have and e.sendDontHave: - trace "Adding dont have entry to presence response", cid = e.cid + trace "Adding dont have entry to presence response", address = $e.address presence.add( BlockPresence( - cid: e.cid.data.buffer, + address: e.address, `type`: BlockPresenceType.DontHave, price: price)) elif have and e.wantType == WantType.WantHave: - trace "Adding have entry to presence response", cid = e.cid + trace "Adding have entry to presence response", address = $e.address presence.add( BlockPresence( - cid: e.cid.data.buffer, + address: e.address, `type`: BlockPresenceType.Have, price: price)) elif e.wantType == WantType.WantBlock: - trace "Added entry to peer's want blocks list", cid = e.cid + trace "Added entry to peer's want blocks list", address = $e.address peerCtx.peerWants.add(e) codexBlockExchangeWantBlockListsReceived.inc() else: @@ -468,30 +574,41 @@ proc taskHandler*(b: BlockExcEngine, task: BlockExcPeerCtx) {.gcsafe, async.} = wantsBlocks.sort(SortOrder.Descending) + proc localLookup(e: Entry): Future[?!BlockDelivery] {.async.} = + if e.address.leaf: + (await b.localStore.getBlockAndProof(e.address.treeCid, e.address.index)).map( + (blkAndProof: (Block, MerkleProof)) => + BlockDelivery(address: e.address, blk: blkAndProof[0], proof: blkAndProof[1]) + # pb.Block(cid: blk.cid, blk.data, leaf: true, treeCid: e.treeCid, proof: proof) + ) + else: + (await b.localStore.getBlock(e.address.cid)).map( + (blk: Block) => BlockDelivery(address: e.address, blk: blk) + ) + let - blockFuts = await allFinished(wantsBlocks.mapIt( - b.localStore.getBlock(it.cid) - )) + blocksDeliveryFut = await allFinished(wantsBlocks.map(localLookup)) # Extract successfully received blocks let - blocks = blockFuts + blocksDelivery = blocksDeliveryFut .filterIt(it.completed and it.read.isOk) .mapIt(it.read.get) - if blocks.len > 0: - trace "Sending blocks to peer", peer = task.id, blocks = blocks.len - await b.network.request.sendBlocks( + if blocksDelivery.len > 0: + trace "Sending blocks to peer", peer = task.id, blocks = blocksDelivery.len + await b.network.request.sendBlocksDelivery( task.id, - blocks) + blocksDelivery + ) - codexBlockExchangeBlocksSent.inc(blocks.len.int64) + codexBlockExchangeBlocksSent.inc(blocksDelivery.len.int64) - trace "About to remove entries from peerWants", blocks = blocks.len, items = task.peerWants.len + trace "About to remove entries from peerWants", blocks = blocksDelivery.len, items = task.peerWants.len # Remove successfully sent blocks task.peerWants.keepIf( proc(e: Entry): bool = - not blocks.anyIt( it.cid == e.cid ) + not blocksDelivery.anyIt( it.address == e.address ) ) trace "Removed entries from peerWants", items = task.peerWants.len @@ -547,7 +664,7 @@ proc new*( proc blockWantListHandler( peer: PeerId, - wantList: Wantlist): Future[void] {.gcsafe.} = + wantList: WantList): Future[void] {.gcsafe.} = engine.wantListHandler(peer, wantList) proc blockPresenceHandler( @@ -555,10 +672,10 @@ proc new*( presence: seq[BlockPresence]): Future[void] {.gcsafe.} = engine.blockPresenceHandler(peer, presence) - proc blocksHandler( + proc blocksDeliveryHandler( peer: PeerId, - blocks: seq[bt.Block]): Future[void] {.gcsafe.} = - engine.blocksHandler(peer, blocks) + blocksDelivery: seq[BlockDelivery]): Future[void] {.gcsafe.} = + engine.blocksDeliveryHandler(peer, blocksDelivery) proc accountHandler(peer: PeerId, account: Account): Future[void] {.gcsafe.} = engine.accountHandler(peer, account) @@ -568,7 +685,7 @@ proc new*( network.handlers = BlockExcHandlers( onWantList: blockWantListHandler, - onBlocks: blocksHandler, + onBlocksDelivery: blocksDeliveryHandler, onPresence: blockPresenceHandler, onAccount: accountHandler, onPayment: paymentHandler) diff --git a/codex/blockexchange/engine/pendingblocks.nim b/codex/blockexchange/engine/pendingblocks.nim index e897a66f..18efca1c 100644 --- a/codex/blockexchange/engine/pendingblocks.nim +++ b/codex/blockexchange/engine/pendingblocks.nim @@ -14,11 +14,16 @@ import pkg/upraises push: {.upraises: [].} import pkg/chronicles +import pkg/questionable +import pkg/questionable/results import pkg/chronos import pkg/libp2p import pkg/metrics import ../../blocktype +import ../protobuf/blockexc + +import ../../merkletree logScope: topics = "codex pendingblocks" @@ -28,17 +33,90 @@ declareGauge(codexBlockExchangePendingBlockRequests, "codex blockexchange pendin const DefaultBlockTimeout* = 10.minutes +# TODO change bool fields delivered/inflight to enum + type BlockReq* = object handle*: Future[Block] inFlight*: bool + LeafReq* = object + handle*: Future[Block] + inFlight*: bool + case delivered*: bool + of true: + proof*: MerkleProof + else: + discard + + TreeReq* = ref object + leaves*: Table[Natural, LeafReq] # TODO consider seq + treeHandle*: Future[MerkleTree] + awaitCount*: Natural + merkleRoot*: MultiHash + treeCid*: Cid + PendingBlocksManager* = ref object of RootObj blocks*: Table[Cid, BlockReq] # pending Block requests + trees*: Table[Cid, TreeReq] proc updatePendingBlockGauge(p: PendingBlocksManager) = codexBlockExchangePendingBlockRequests.set(p.blocks.len.int64) +proc getWantHandle*( + treeReq: TreeReq, + index: Natural, + timeout = DefaultBlockTimeout +): Future[Block] {.async.} = + if not index in treeReq.leaves: + let value = LeafReq( + handle: newFuture[Block]("pendingBlocks.getWantHandle"), + inFlight: false, + delivered: false + ) + # discard value # TODO wtf? + treeReq.leaves[index] = value + + try: + return await treeReq.leaves[index].handle.wait(timeout) + except CancelledError as exc: + trace "Blocks cancelled", exc = exc.msg, treeCid = treeReq.treeCid, index = index + raise exc + except CatchableError as exc: + trace "Pending WANT failed or expired", exc = exc.msg, treeCid = treeReq.treeCid, index = index + raise exc + finally: + discard + # TODO handle gc-ing leafs + # p.blocks.del(cid) + # p.updatePendingBlockGauge() + +proc getOrPutTreeReq*( + p: PendingBlocksManager, + treeCid: Cid, + leavesCount: Natural, + merkleRoot: MultiHash, +): ?!TreeReq = + if treeCid notin p.trees: + var value = TreeReq( + treeHandle: newFuture[MerkleTree]("pendingBlocks.getWantHandle"), + merkleRoot: merkleRoot, + awaitCount: leavesCount, + treeCid: treeCid + ) + p.trees[treeCid] = value + return success(value) + else: + try: + let req = p.trees[treeCid] + if req.merkleRoot == merkleRoot and + req.awaitCount <= leavesCount: + return success(req) + else: + return failure("Unexpected root or leaves count for tree with cid " & $treeCid) + except CatchableError as err: #TODO fixit + return failure("fdafafds") + proc getWantHandle*( p: PendingBlocksManager, cid: Cid, @@ -70,16 +148,28 @@ proc getWantHandle*( p.updatePendingBlockGauge() proc resolve*(p: PendingBlocksManager, - blocks: seq[Block]) = + blocksDelivery: seq[BlockDelivery]) = ## Resolve pending blocks ## - for blk in blocks: - # resolve any pending blocks - p.blocks.withValue(blk.cid, pending): + for bd in blocksDelivery: + p.blocks.withValue(bd.blk.cid, pending): if not pending[].handle.completed: - trace "Resolving block", cid = blk.cid - pending[].handle.complete(blk) + trace "Resolving block", cid = bd.blk.cid + pending[].handle.complete(bd.blk) + + # resolve any pending blocks + if bd.address.leaf: + p.trees.withValue(bd.address.treeCid, treeReq): + treeReq[].leaves.withValue(bd.address.index, leafReq): + if not leafReq[].handle.completed: # TODO verify merkle proof + trace "Resolving leaf block", cid = bd.blk.cid + leafReq[].handle.complete(bd.blk) # TODO replace it with new future -> load blk from store by cid + leafReq[].proof = bd.proof # TODO fix it + leafReq[].delivered = true + dec treeReq[].awaitCount + + # TODO if last block produce a merkle tree and save it into the local store and GC everything and run "queueProvideBlocksReq" proc setInFlight*(p: PendingBlocksManager, cid: Cid, @@ -88,6 +178,25 @@ proc setInFlight*(p: PendingBlocksManager, pending[].inFlight = inFlight trace "Setting inflight", cid, inFlight = pending[].inFlight +proc setInFlight*(treeReq: TreeReq, + index: Natural, + inFlight = true) = + treeReq.leaves.withValue(index, leafReq): + leafReq[].inFlight = inFlight + # pending[].inFlight = inFlight + # TODO + trace "Setting inflight", treeCid = treeReq.treeCid, index, inFlight = inFlight + +proc isInFlight*(treeReq: TreeReq, + index: Natural + ): bool = + treeReq.leaves.withValue(index, leafReq): + return leafReq[].inFlight + # treeReq.leaves.?[index].?inFlight ?| false + # p.blocks.withValue(cid, pending): + # result = pending[].inFlight + # trace "Getting inflight", cid, inFlight = result + proc isInFlight*(p: PendingBlocksManager, cid: Cid ): bool = @@ -101,10 +210,24 @@ proc pending*(p: PendingBlocksManager, cid: Cid): bool = proc contains*(p: PendingBlocksManager, cid: Cid): bool = p.pending(cid) -iterator wantList*(p: PendingBlocksManager): Cid = +iterator wantList*(p: PendingBlocksManager): BlockAddress = + for k in p.blocks.keys: + yield BlockAddress(leaf: false, cid: k) + + for treeCid, treeReq in p.trees.pairs: + for index, leafReq in treeReq.leaves.pairs: + if not leafReq.delivered: + yield BlockAddress(leaf: true, treeCid: treeCid, index: index) + +# TODO rename to `discoveryCids` +iterator wantListCids*(p: PendingBlocksManager): Cid = for k in p.blocks.keys: yield k + for k in p.trees.keys: + yield k + +# TODO remove it? iterator wantHandles*(p: PendingBlocksManager): Future[Block] = for v in p.blocks.values: yield v.handle diff --git a/codex/blockexchange/network/network.nim b/codex/blockexchange/network/network.nim index 88209f9f..f9e43c2b 100644 --- a/codex/blockexchange/network/network.nim +++ b/codex/blockexchange/network/network.nim @@ -34,14 +34,14 @@ const MaxInflight* = 100 type - WantListHandler* = proc(peer: PeerId, wantList: Wantlist): Future[void] {.gcsafe.} - BlocksHandler* = proc(peer: PeerId, blocks: seq[bt.Block]): Future[void] {.gcsafe.} + WantListHandler* = proc(peer: PeerId, wantList: WantList): Future[void] {.gcsafe.} + BlocksDeliveryHandler* = proc(peer: PeerId, blocks: seq[BlockDelivery]): Future[void] {.gcsafe.} BlockPresenceHandler* = proc(peer: PeerId, precense: seq[BlockPresence]): Future[void] {.gcsafe.} AccountHandler* = proc(peer: PeerId, account: Account): Future[void] {.gcsafe.} PaymentHandler* = proc(peer: PeerId, payment: SignedState): Future[void] {.gcsafe.} WantListSender* = proc( id: PeerId, - cids: seq[Cid], + cids: seq[BlockAddress], priority: int32 = 0, cancel: bool = false, wantType: WantType = WantType.WantHave, @@ -50,19 +50,19 @@ type BlockExcHandlers* = object onWantList*: WantListHandler - onBlocks*: BlocksHandler + onBlocksDelivery*: BlocksDeliveryHandler onPresence*: BlockPresenceHandler onAccount*: AccountHandler onPayment*: PaymentHandler - BlocksSender* = proc(peer: PeerId, presence: seq[bt.Block]): Future[void] {.gcsafe.} + BlocksDeliverySender* = proc(peer: PeerId, blocksDelivery: seq[BlockDelivery]): Future[void] {.gcsafe.} PresenceSender* = proc(peer: PeerId, presence: seq[BlockPresence]): Future[void] {.gcsafe.} AccountSender* = proc(peer: PeerId, account: Account): Future[void] {.gcsafe.} PaymentSender* = proc(peer: PeerId, payment: SignedState): Future[void] {.gcsafe.} BlockExcRequest* = object sendWantList*: WantListSender - sendBlocks*: BlocksSender + sendBlocksDelivery*: BlocksDeliverySender sendPresence*: PresenceSender sendAccount*: AccountSender sendPayment*: PaymentSender @@ -94,7 +94,7 @@ proc send*(b: BlockExcNetwork, id: PeerId, msg: pb.Message) {.async.} = proc handleWantList( b: BlockExcNetwork, peer: NetworkPeer, - list: Wantlist) {.async.} = + list: WantList) {.async.} = ## Handle incoming want list ## @@ -102,32 +102,12 @@ proc handleWantList( trace "Handling want list for peer", peer = peer.id, items = list.entries.len await b.handlers.onWantList(peer.id, list) -# TODO: make into a template -proc makeWantList*( - cids: seq[Cid], - priority: int = 0, - cancel: bool = false, - wantType: WantType = WantType.WantHave, - full: bool = false, - sendDontHave: bool = false -): Wantlist = - ## make list of wanted entries - ## - - Wantlist( - entries: cids.mapIt( - Entry( - `block`: it.data.buffer, - priority: priority.int32, - cancel: cancel, - wantType: wantType, - sendDontHave: sendDontHave) ), - full: full) - proc sendWantList*( b: BlockExcNetwork, id: PeerId, - cids: seq[Cid], + addresses: seq[BlockAddress], + # cids: seq[Cid], + # leafs: seq[(Cid, seq[Natural])], priority: int32 = 0, cancel: bool = false, wantType: WantType = WantType.WantHave, @@ -137,58 +117,59 @@ proc sendWantList*( ## Send a want message to peer ## - trace "Sending want list to peer", peer = id, `type` = $wantType, items = cids.len - let msg = makeWantList( - cids, - priority, - cancel, - wantType, - full, - sendDontHave) - + trace "Sending want list to peer", peer = id, `type` = $wantType, items = addresses.len + let msg = WantList( + entries: addresses.mapIt( + Entry( + address: it, + priority: priority, + cancel: cancel, + wantType: wantType, + sendDontHave: sendDontHave) ), + full: full) + b.send(id, Message(wantlist: msg)) -proc handleBlocks( +proc handleBlocksDelivery( b: BlockExcNetwork, peer: NetworkPeer, - blocks: seq[pb.Block] + blocksDelivery: seq[BlockDelivery] ) {.async.} = ## Handle incoming blocks ## - if not b.handlers.onBlocks.isNil: - trace "Handling blocks for peer", peer = peer.id, items = blocks.len + if not b.handlers.onBlocksDelivery.isNil: + trace "Handling blocks for peer", peer = peer.id, items = blocksDelivery.len + await b.handlers.onBlocksDelivery(peer.id, blocksDelivery) - var blks: seq[bt.Block] - for blob in blocks: - without cid =? Cid.init(blob.prefix): - trace "Unable to initialize Cid from protobuf message" + # var blks: seq[bt.Block] + # for blob in blocks: + # without cid =? Cid.init(blob.prefix): + # trace "Unable to initialize Cid from protobuf message" - without blk =? bt.Block.new(cid, blob.data, verify = true): - trace "Unable to initialize Block from data" + # without blk =? bt.Block.new(cid, blob.data, verify = true): + # trace "Unable to initialize Block from data" - blks.add(blk) + # blks.add(blk) - await b.handlers.onBlocks(peer.id, blks) +# template makeBlocks*(blocks: seq[bt.Block]): seq[pb.Block] = +# var blks: seq[pb.Block] +# for blk in blocks: +# blks.add(pb.Block( +# prefix: blk.cid.data.buffer, +# data: blk.data +# )) -template makeBlocks*(blocks: seq[bt.Block]): seq[pb.Block] = - var blks: seq[pb.Block] - for blk in blocks: - blks.add(pb.Block( - prefix: blk.cid.data.buffer, - data: blk.data - )) +# blks - blks - -proc sendBlocks*( +proc sendBlocksDelivery*( b: BlockExcNetwork, id: PeerId, - blocks: seq[bt.Block]): Future[void] = + blocksDelivery: seq[BlockDelivery]): Future[void] = ## Send blocks to remote ## - b.send(id, pb.Message(payload: makeBlocks(blocks))) + b.send(id, pb.Message(payload: blocksDelivery)) proc handleBlockPresence( b: BlockExcNetwork, @@ -260,11 +241,11 @@ proc rpcHandler( ## handle rpc messages ## try: - if msg.wantlist.entries.len > 0: - asyncSpawn b.handleWantList(peer, msg.wantlist) + if msg.wantList.entries.len > 0: + asyncSpawn b.handleWantList(peer, msg.wantList) if msg.payload.len > 0: - asyncSpawn b.handleBlocks(peer, msg.payload) + asyncSpawn b.handleBlocksDelivery(peer, msg.payload) if msg.blockPresences.len > 0: asyncSpawn b.handleBlockPresence(peer, msg.blockPresences) @@ -359,7 +340,7 @@ proc new*( proc sendWantList( id: PeerId, - cids: seq[Cid], + cids: seq[BlockAddress], priority: int32 = 0, cancel: bool = false, wantType: WantType = WantType.WantHave, @@ -369,8 +350,8 @@ proc new*( id, cids, priority, cancel, wantType, full, sendDontHave) - proc sendBlocks(id: PeerId, blocks: seq[bt.Block]): Future[void] {.gcsafe.} = - self.sendBlocks(id, blocks) + proc sendBlocksDelivery(id: PeerId, blocksDelivery: seq[BlockDelivery]): Future[void] {.gcsafe.} = + self.sendBlocksDelivery(id, blocksDelivery) proc sendPresence(id: PeerId, presence: seq[BlockPresence]): Future[void] {.gcsafe.} = self.sendBlockPresence(id, presence) @@ -383,7 +364,7 @@ proc new*( self.request = BlockExcRequest( sendWantList: sendWantList, - sendBlocks: sendBlocks, + sendBlocksDelivery: sendBlocksDelivery, sendPresence: sendPresence, sendAccount: sendAccount, sendPayment: sendPayment) diff --git a/codex/blockexchange/peers/peercontext.nim b/codex/blockexchange/peers/peercontext.nim index 6c9eac1c..560702cc 100644 --- a/codex/blockexchange/peers/peercontext.nim +++ b/codex/blockexchange/peers/peercontext.nim @@ -9,6 +9,7 @@ import std/sequtils import std/tables +import std/sugar import pkg/chronicles import pkg/libp2p @@ -20,6 +21,8 @@ import ../protobuf/blockexc import ../protobuf/payments import ../protobuf/presence +import ../../blocktype + export payments, nitro logScope: @@ -28,33 +31,41 @@ logScope: type BlockExcPeerCtx* = ref object of RootObj id*: PeerId - blocks*: Table[Cid, Presence] # remote peer have list including price + blocks*: Table[BlockAddress, Presence] # remote peer have list including price peerWants*: seq[Entry] # remote peers want lists exchanged*: int # times peer has exchanged with us lastExchange*: Moment # last time peer has exchanged with us account*: ?Account # ethereum account of this peer paymentChannel*: ?ChannelId # payment channel id -proc peerHave*(self: BlockExcPeerCtx): seq[Cid] = +proc peerHave*(self: BlockExcPeerCtx): seq[BlockAddress] = toSeq(self.blocks.keys) -proc contains*(self: BlockExcPeerCtx, cid: Cid): bool = - cid in self.blocks +# TODO return set +proc peerHaveCids*(self: BlockExcPeerCtx): seq[Cid] = + self.blocks.keys.toSeq.mapIt(it.cidOrTreeCid).deduplicate + +# TODO return set +proc peerWantsCids*(self: BlockExcPeerCtx): seq[Cid] = + self.peerWants.mapIt(it.address.cidOrTreeCid).deduplicate + +proc contains*(self: BlockExcPeerCtx, address: BlockAddress): bool = + address in self.blocks func setPresence*(self: BlockExcPeerCtx, presence: Presence) = - self.blocks[presence.cid] = presence + self.blocks[presence.address] = presence -func cleanPresence*(self: BlockExcPeerCtx, cids: seq[Cid]) = - for cid in cids: - self.blocks.del(cid) +func cleanPresence*(self: BlockExcPeerCtx, addresses: seq[BlockAddress]) = + for a in addresses: + self.blocks.del(a) -func cleanPresence*(self: BlockExcPeerCtx, cid: Cid) = - self.cleanPresence(@[cid]) +func cleanPresence*(self: BlockExcPeerCtx, address: BlockAddress) = + self.cleanPresence(@[address]) -func price*(self: BlockExcPeerCtx, cids: seq[Cid]): UInt256 = +func price*(self: BlockExcPeerCtx, addresses: seq[BlockAddress]): UInt256 = var price = 0.u256 - for cid in cids: - self.blocks.withValue(cid, precense): + for a in addresses: + self.blocks.withValue(a, precense): price += precense[].price trace "Blocks price", price diff --git a/codex/blockexchange/peers/peerctxstore.nim b/codex/blockexchange/peers/peerctxstore.nim index 340d3ee0..27110a7a 100644 --- a/codex/blockexchange/peers/peerctxstore.nim +++ b/codex/blockexchange/peers/peerctxstore.nim @@ -20,6 +20,7 @@ import pkg/chronicles import pkg/libp2p import ../protobuf/blockexc +import ../../blocktype import ./peercontext export peercontext @@ -60,10 +61,11 @@ func len*(self: PeerCtxStore): int = self.peers.len func peersHave*(self: PeerCtxStore, cid: Cid): seq[BlockExcPeerCtx] = - toSeq(self.peers.values).filterIt( it.peerHave.anyIt( it == cid ) ) + toSeq(self.peers.values).filterIt( it.peerHave.anyIt( not it.leaf and it.cid == cid ) ) +# TODO check usages func peersWant*(self: PeerCtxStore, cid: Cid): seq[BlockExcPeerCtx] = - toSeq(self.peers.values).filterIt( it.peerWants.anyIt( it.cid == cid ) ) + toSeq(self.peers.values).filterIt( it.peerWants.anyIt( not it.address.leaf and it.address.cid == cid ) ) func selectCheapest*(self: PeerCtxStore, cid: Cid): seq[BlockExcPeerCtx] = var peers = self.peersHave(cid) @@ -73,10 +75,12 @@ func selectCheapest*(self: PeerCtxStore, cid: Cid): seq[BlockExcPeerCtx] = priceA = 0.u256 priceB = 0.u256 - a.blocks.withValue(cid, precense): + let blockAddress = BlockAddress(leaf: false, cid: cid) + + a.blocks.withValue(blockAddress, precense): priceA = precense[].price - b.blocks.withValue(cid, precense): + b.blocks.withValue(blockAddress, precense): priceB = precense[].price if priceA == priceB: diff --git a/codex/blockexchange/protobuf/blockexc.nim b/codex/blockexchange/protobuf/blockexc.nim index 43a9dff1..ed5d6816 100644 --- a/codex/blockexchange/protobuf/blockexc.nim +++ b/codex/blockexchange/protobuf/blockexc.nim @@ -10,46 +10,55 @@ import std/hashes import std/sequtils import pkg/libp2p +import pkg/stew/endians2 import message +import ../../blocktype + export Message, protobufEncode, protobufDecode export Wantlist, WantType, Entry -export Block, BlockPresenceType, BlockPresence +export BlockDelivery, BlockPresenceType, BlockPresence export AccountMessage, StateChannelUpdate proc hash*(e: Entry): Hash = - hash(e.`block`) + # hash(e.`block`) + if e.address.leaf: + let data = e.address.treeCid.data.buffer & @(e.address.index.uint64.toBytesBE) + hash(data) + else: + hash(e.address.cid.data.buffer) + -proc cid*(e: Entry): Cid = - ## Helper to convert raw bytes to Cid - ## +# proc cid*(e: Entry): Cid = +# ## Helper to convert raw bytes to Cid +# ## - Cid.init(e.`block`).get() +# Cid.init(e.`block`).get() -proc contains*(a: openArray[Entry], b: Cid): bool = +proc contains*(a: openArray[Entry], b: BlockAddress): bool = ## Convenience method to check for peer precense ## - a.filterIt( it.cid == b ).len > 0 + a.filterIt( it.address == b ).len > 0 -proc `==`*(a: Entry, cid: Cid): bool = - return a.cid == cid +proc `==`*(a: Entry, b: BlockAddress): bool = + return a.address == b proc `<`*(a, b: Entry): bool = a.priority < b.priority -proc cid*(e: BlockPresence): Cid = - ## Helper to convert raw bytes to Cid - ## +# proc cid*(e: BlockPresence): Cid = +# ## Helper to convert raw bytes to Cid +# ## - Cid.init(e.cid).get() +# Cid.init(e.cid).get() -proc `==`*(a: BlockPresence, cid: Cid): bool = - return cid(a) == cid +proc `==`*(a: BlockPresence, b: BlockAddress): bool = + return a.address == b -proc contains*(a: openArray[BlockPresence], b: Cid): bool = +proc contains*(a: openArray[BlockPresence], b: BlockAddress): bool = ## Convenience method to check for peer precense ## - a.filterIt( cid(it) == b ).len > 0 + a.filterIt( it.address == b ).len > 0 diff --git a/codex/blockexchange/protobuf/message.nim b/codex/blockexchange/protobuf/message.nim index bbbfdf49..c196c283 100644 --- a/codex/blockexchange/protobuf/message.nim +++ b/codex/blockexchange/protobuf/message.nim @@ -2,11 +2,16 @@ # and Protobuf encoder/decoder for these messages. # # Eventually all this code should be auto-generated from message.proto. +import std/sugar import pkg/libp2p/protobuf/minprotobuf +import pkg/libp2p/cid import ../../units +import ../../merkletree +import ../../blocktype + const MaxBlockSize* = 100.MiBs.uint MaxMessageSize* = 100.MiBs.uint @@ -17,28 +22,51 @@ type WantHave = 1 Entry* = object - `block`*: seq[byte] # The block cid + address*: BlockAddress priority*: int32 # The priority (normalized). default to 1 cancel*: bool # Whether this revokes an entry wantType*: WantType # Note: defaults to enum 0, ie Block sendDontHave*: bool # Note: defaults to false - Wantlist* = object - entries*: seq[Entry] # A list of wantlist entries - full*: bool # Whether this is the full wantlist. default to false + # blockId: + # case leaf*: bool + # of true: + # treeCid: Cid + # index: int + # else: + # blockCid*: Cid # The block cid #TODO rename it to `cid` or `blockCid` - Block* = object - prefix*: seq[byte] # CID prefix (cid version, multicodec and multihash prefix (type + length) - data*: seq[byte] + WantList* = object + entries*: seq[Entry] # A list of wantList entries + full*: bool # Whether this is the full wantList. default to false + + BlockDelivery* = object + blk*: Block + address*: BlockAddress + proof*: MerkleProof # Present only if `address.leaf` is true + + # case leaf*: bool + # of true: + # treeCid: Cid + # proof: MerkleProof + # else: + # discard BlockPresenceType* = enum Have = 0, DontHave = 1 BlockPresence* = object - cid*: seq[byte] # The block cid + address*: BlockAddress `type`*: BlockPresenceType price*: seq[byte] # Amount of assets to pay for the block (UInt256) + # case leaf*: bool + # of true: + # treeCid: Cid + # index: int + # else: + # blockCid*: Cid + AccountMessage* = object address*: seq[byte] # Ethereum address to which payments should be made @@ -47,8 +75,8 @@ type update*: seq[byte] # Signed Nitro state, serialized as JSON Message* = object - wantlist*: Wantlist - payload*: seq[Block] + wantList*: WantList + payload*: seq[BlockDelivery] blockPresences*: seq[BlockPresence] pendingBytes*: uint account*: AccountMessage @@ -58,9 +86,20 @@ type # Encoding Message into seq[byte] in Protobuf format # +proc write*(pb: var ProtoBuffer, field: int, value: BlockAddress) = + var ipb = initProtoBuffer() + ipb.write(1, value.leaf.uint) + if value.leaf: + ipb.write(2, value.treeCid.data.buffer) + ipb.write(3, value.index.uint64) + else: + ipb.write(4, value.cid.data.buffer) + ipb.finish() + pb.write(field, ipb) + proc write*(pb: var ProtoBuffer, field: int, value: Entry) = var ipb = initProtoBuffer() - ipb.write(1, value.`block`) + ipb.write(1, value.address) ipb.write(2, value.priority.uint64) ipb.write(3, value.cancel.uint) ipb.write(4, value.wantType.uint) @@ -68,7 +107,7 @@ proc write*(pb: var ProtoBuffer, field: int, value: Entry) = ipb.finish() pb.write(field, ipb) -proc write*(pb: var ProtoBuffer, field: int, value: Wantlist) = +proc write*(pb: var ProtoBuffer, field: int, value: WantList) = var ipb = initProtoBuffer() for v in value.entries: ipb.write(1, v) @@ -76,16 +115,19 @@ proc write*(pb: var ProtoBuffer, field: int, value: Wantlist) = ipb.finish() pb.write(field, ipb) -proc write*(pb: var ProtoBuffer, field: int, value: Block) = +proc write*(pb: var ProtoBuffer, field: int, value: BlockDelivery) = var ipb = initProtoBuffer(maxSize = MaxBlockSize) - ipb.write(1, value.prefix) - ipb.write(2, value.data) + ipb.write(1, value.blk.cid.data.buffer) + ipb.write(2, value.blk.data) + ipb.write(3, value.address) + if value.address.leaf: + ipb.write(4, value.proof.encode()) ipb.finish() pb.write(field, ipb) proc write*(pb: var ProtoBuffer, field: int, value: BlockPresence) = var ipb = initProtoBuffer() - ipb.write(1, value.cid) + ipb.write(1, value.address) ipb.write(2, value.`type`.uint) ipb.write(3, value.price) ipb.finish() @@ -105,7 +147,7 @@ proc write*(pb: var ProtoBuffer, field: int, value: StateChannelUpdate) = proc protobufEncode*(value: Message): seq[byte] = var ipb = initProtoBuffer(maxSize = MaxMessageSize) - ipb.write(1, value.wantlist) + ipb.write(1, value.wantList) for v in value.payload: ipb.write(3, v) for v in value.blockPresences: @@ -120,12 +162,41 @@ proc protobufEncode*(value: Message): seq[byte] = # # Decoding Message from seq[byte] in Protobuf format # +proc decode*(_: type BlockAddress, pb: ProtoBuffer): ProtoResult[BlockAddress] = + var + value: BlockAddress + leaf: bool + field: uint64 + cidBuf = newSeq[byte]() + + if ? pb.getField(1, field): + leaf = bool(field) + + if leaf: + var + treeCid: Cid + index: Natural + if ? pb.getField(2, cidBuf): + treeCid = ? Cid.init(cidBuf).mapErr(x => ProtoError.IncorrectBlob) + if ? pb.getField(3, field): + index = field + value = BlockAddress(leaf: true, treeCid: treeCid, index: index) + else: + var cid: Cid + if ? pb.getField(4, cidBuf): + cid = ? Cid.init(cidBuf).mapErr(x => ProtoError.IncorrectBlob) + value = BlockAddress(leaf: false, cid: cid) + + ok(value) proc decode*(_: type Entry, pb: ProtoBuffer): ProtoResult[Entry] = var value = Entry() field: uint64 - discard ? pb.getField(1, value.`block`) + ipb: ProtoBuffer + buf = newSeq[byte]() + if ? pb.getField(1, ipb): + value.address = ? BlockAddress.decode(ipb) if ? pb.getField(2, field): value.priority = int32(field) if ? pb.getField(3, field): @@ -136,9 +207,9 @@ proc decode*(_: type Entry, pb: ProtoBuffer): ProtoResult[Entry] = value.sendDontHave = bool(field) ok(value) -proc decode*(_: type Wantlist, pb: ProtoBuffer): ProtoResult[Wantlist] = +proc decode*(_: type WantList, pb: ProtoBuffer): ProtoResult[WantList] = var - value = Wantlist() + value = WantList() field: uint64 sublist: seq[seq[byte]] if ? pb.getRepeatedField(1, sublist): @@ -148,18 +219,36 @@ proc decode*(_: type Wantlist, pb: ProtoBuffer): ProtoResult[Wantlist] = value.full = bool(field) ok(value) -proc decode*(_: type Block, pb: ProtoBuffer): ProtoResult[Block] = +proc decode*(_: type BlockDelivery, pb: ProtoBuffer): ProtoResult[BlockDelivery] = var - value = Block() - discard ? pb.getField(1, value.prefix) - discard ? pb.getField(2, value.data) + value = BlockDelivery() + field: uint64 + dataBuf = newSeq[byte]() + cidBuf = newSeq[byte]() + cid: Cid + ipb: ProtoBuffer + + if ? pb.getField(1, cidBuf): + cid = ? Cid.init(cidBuf).mapErr(x => ProtoError.IncorrectBlob) + if ? pb.getField(2, dataBuf): + value.blk = ? Block.new(cid, dataBuf, verify = true).mapErr(x => ProtoError.IncorrectBlob) + if ? pb.getField(3, ipb): + value.address = ? BlockAddress.decode(ipb) + + if value.address.leaf: + var proofBuf = newSeq[byte]() + if ? pb.getField(4, proofBuf): + value.proof = ? MerkleProof.decode(proofBuf).mapErr(x => ProtoError.IncorrectBlob) + ok(value) proc decode*(_: type BlockPresence, pb: ProtoBuffer): ProtoResult[BlockPresence] = var value = BlockPresence() field: uint64 - discard ? pb.getField(1, value.cid) + ipb: ProtoBuffer + if ? pb.getField(1, ipb): + value.address = ? BlockAddress.decode(ipb) if ? pb.getField(2, field): value.`type` = BlockPresenceType(field) discard ? pb.getField(3, value.price) @@ -184,10 +273,10 @@ proc protobufDecode*(_: type Message, msg: seq[byte]): ProtoResult[Message] = ipb: ProtoBuffer sublist: seq[seq[byte]] if ? pb.getField(1, ipb): - value.wantlist = ? Wantlist.decode(ipb) + value.wantList = ? WantList.decode(ipb) if ? pb.getRepeatedField(3, sublist): for item in sublist: - value.payload.add(? Block.decode(initProtoBuffer(item, maxSize = MaxBlockSize))) + value.payload.add(? BlockDelivery.decode(initProtoBuffer(item, maxSize = MaxBlockSize))) if ? pb.getRepeatedField(4, sublist): for item in sublist: value.blockPresences.add(? BlockPresence.decode(initProtoBuffer(item))) diff --git a/codex/blockexchange/protobuf/presence.nim b/codex/blockexchange/protobuf/presence.nim index 1a1c6c5c..142c6085 100644 --- a/codex/blockexchange/protobuf/presence.nim +++ b/codex/blockexchange/protobuf/presence.nim @@ -5,6 +5,8 @@ import pkg/questionable/results import pkg/upraises import ./blockexc +import ../../blocktype + export questionable export stint export BlockPresenceType @@ -14,7 +16,7 @@ upraises.push: {.upraises: [].} type PresenceMessage* = blockexc.BlockPresence Presence* = object - cid*: Cid + address*: BlockAddress have*: bool price*: UInt256 @@ -24,22 +26,21 @@ func parse(_: type UInt256, bytes: seq[byte]): ?UInt256 = UInt256.fromBytesBE(bytes).some func init*(_: type Presence, message: PresenceMessage): ?Presence = - without cid =? Cid.init(message.cid) and - price =? UInt256.parse(message.price): + without price =? UInt256.parse(message.price): return none Presence some Presence( - cid: cid, + address: message.address, have: message.`type` == BlockPresenceType.Have, price: price ) -func init*(_: type PresenceMessage, presence: Presence): PresenceMessage = - PresenceMessage( - cid: presence.cid.data.buffer, - `type`: if presence.have: - BlockPresenceType.Have - else: - BlockPresenceType.DontHave, - price: @(presence.price.toBytesBE) - ) +# func init*(_: type PresenceMessage, presence: Presence): PresenceMessage = +# PresenceMessage( +# cid: presence.cid.data.buffer, +# `type`: if presence.have: +# BlockPresenceType.Have +# else: +# BlockPresenceType.DontHave, +# price: @(presence.price.toBytesBE) +# ) diff --git a/codex/blocktype.nim b/codex/blocktype.nim index 950b230c..3d10ebea 100644 --- a/codex/blocktype.nim +++ b/codex/blocktype.nim @@ -37,6 +37,14 @@ type cid*: Cid data*: seq[byte] + BlockAddress* = object + case leaf*: bool + of true: + treeCid*: Cid + index*: Natural + else: + cid*: Cid + template EmptyCid*: untyped = var EmptyCid {.global, threadvar.}: @@ -106,6 +114,29 @@ template EmptyBlock*: untyped = EmptyBlock +proc `==`*(a, b: BlockAddress): bool = + a.leaf == b.leaf and + ( + if a.leaf: + a.treeCid == b.treeCid and a.index == b.index + else: + a.cid == b.cid + ) + +proc `$`*(a: BlockAddress): string = + if a.leaf: + "treeCid: " & $a.treeCid & ", index: " & $a.index + else: + "cid: " & $a.cid + +proc repr*(a: BlockAddress): string = $a + +proc cidOrTreeCid*(a: BlockAddress): Cid = + if a.leaf: + a.treeCid + else: + a.cid + proc isEmpty*(cid: Cid): bool = cid == EmptyCid[cid.cidver] .catch diff --git a/codex/erasure/erasure.nim b/codex/erasure/erasure.nim index 9b4df58d..1d93027f 100644 --- a/codex/erasure/erasure.nim +++ b/codex/erasure/erasure.nim @@ -104,7 +104,7 @@ proc encode*( blockIdx = toSeq(countup(i, encoded.rounded - 1, encoded.steps)) # request all blocks from the store dataBlocks = await allFinished( - blockIdx.mapIt( self.store.getBlock(encoded[it]) )) + blockIdx.mapIt( self.store.getBlock(manifest.treeCid, it) )) # TODO is it correct? # TODO: this is a tight blocking loop so we sleep here to allow # other events to be processed, this should be addressed @@ -138,7 +138,8 @@ proc encode*( return failure(error) trace "Adding parity block", cid = blk.cid, pos = idx - encoded[idx] = blk.cid + # TODO uncomment below and fix it + # encoded[idx] = blk.cid if isErr (await self.store.putBlock(blk)): trace "Unable to store block!", cid = blk.cid return failure("Unable to store block!") @@ -180,7 +181,7 @@ proc decode*( blockIdx = toSeq(countup(i, encoded.len - 1, encoded.steps)) # request all blocks from the store pendingBlocks = blockIdx.mapIt( - self.store.getBlock(encoded[it]) # Get the data blocks (first K) + self.store.getBlock(encoded.treeCid, it) # Get the data blocks (first K) ) # TODO: this is a tight blocking loop so we sleep here to allow @@ -255,10 +256,12 @@ proc decode*( finally: decoder.release() - without decoded =? Manifest.new(blocks = encoded.blocks[0..= leavesCount: + iter.finished = true + + without leaf =? tree.getLeaf(index - 1), err: + return failure(err) + + without leafCid =? Cid.init(CIDv1, leaf.mcodec, leaf).mapFailure, err: + return failure(err) + + without blk =? await self.getBlock(leafCid), err: + return failure(err) + + return success(blk) + else: + return failure("No more elements for tree with cid " & $treeCid) + + iter.next = next + return success(iter) proc getBlockExpirationTimestamp(self: RepoStore, ttl: ?Duration): SecondsSince1970 = let duration = ttl |? self.blockTtl @@ -293,13 +322,13 @@ method hasBlock*(self: RepoStore, cid: Cid): Future[?!bool] {.async.} = method listBlocks*( self: RepoStore, blockType = BlockType.Manifest -): Future[?!BlocksIter] {.async.} = +): Future[?!CidIter] {.async.} = ## Get the list of blocks in the RepoStore. ## This is an intensive operation ## var - iter = BlocksIter() + iter = CidIter() let key = case blockType: diff --git a/codex/streams/storestream.nim b/codex/streams/storestream.nim index 25999f97..1f4387eb 100644 --- a/codex/streams/storestream.nim +++ b/codex/streams/storestream.nim @@ -38,6 +38,9 @@ type store*: BlockStore # Store where to lookup block contents manifest*: Manifest # List of block CIDs pad*: bool # Pad last block to manifest.blockSize? + iter*: BlockIter + lastBlock: Block + lastIndex: int method initStream*(s: StoreStream) = if s.objName.len == 0: @@ -53,10 +56,13 @@ proc new*( ): StoreStream = ## Create a new StoreStream instance for a given store and manifest ## + + result = StoreStream( store: store, manifest: manifest, pad: pad, + lastIndex: -1, offset: 0) result.initStream() @@ -79,38 +85,42 @@ method readOnce*( ## Read `nbytes` from current position in the StoreStream into output buffer pointed by `pbytes`. ## Return how many bytes were actually read before EOF was encountered. ## Raise exception if we are already at EOF. - ## + ## trace "Reading from manifest", cid = self.manifest.cid.get(), blocks = self.manifest.len if self.atEof: raise newLPStreamEOFError() - # The loop iterates over blocks in the StoreStream, - # reading them and copying their data into outbuf + # Initialize a block iterator + if self.lastIndex < 0: + without iter =? await self.store.getBlocks(self.manifest.treeCid, self.manifest.leavesCount, self.manifest.treeRoot), err: + raise newLPStreamReadError(err) + + self.iter = iter + var read = 0 # Bytes read so far, and thus write offset in the outbuf while read < nbytes and not self.atEof: - # Compute from the current stream position `self.offset` the block num/offset to read + if self.offset >= (self.lastIndex + 1) * self.manifest.blockSize.int: + if not self.iter.finished: + without lastBlock =? await self.iter.next(), err: + raise newLPStreamReadError(err) + self.lastBlock = lastBlock + inc self.lastIndex + else: + raise newLPStreamReadError(newException(CodexError, "Block iterator finished prematurely")) + # Compute how many bytes to read from this block let - blockNum = self.offset div self.manifest.blockSize.int blockOffset = self.offset mod self.manifest.blockSize.int readBytes = min([self.size - self.offset, nbytes - read, self.manifest.blockSize.int - blockOffset]) - # Read contents of block `blockNum` - without blk =? await self.store.getBlock(self.manifest.treeCid, blockNum), error: - # TODO Log tree cid and perhaps also block index - trace "Error when getting a block ", msg = error.msg - raise newLPStreamReadError(error) - - trace "Reading bytes from store stream", blockNum, cid = blk.cid, bytes = readBytes, blockOffset - # Copy `readBytes` bytes starting at `blockOffset` from the block into the outbuf - if blk.isEmpty: + if self.lastBlock.isEmpty: zeroMem(pbytes.offset(read), readBytes) else: - copyMem(pbytes.offset(read), blk.data[blockOffset].addr, readBytes) + copyMem(pbytes.offset(read), self.lastBlock.data[blockOffset].addr, readBytes) # Update current positions in the stream and outbuf self.offset += readBytes diff --git a/codex/utils.nim b/codex/utils.nim index 70547c5a..88988ddb 100644 --- a/codex/utils.nim +++ b/codex/utils.nim @@ -14,8 +14,9 @@ import pkg/chronos import ./utils/asyncheapqueue import ./utils/fileutils +import ./utils/iterutils -export asyncheapqueue, fileutils +export asyncheapqueue, fileutils, iterutils func divUp*[T: SomeInteger](a, b : T): T = diff --git a/codex/utils/iterutils.nim b/codex/utils/iterutils.nim new file mode 100644 index 00000000..05012c8e --- /dev/null +++ b/codex/utils/iterutils.nim @@ -0,0 +1,8 @@ +import pkg/chronos +import pkg/upraises + +type + GetNext*[T] = proc(): Future[T] {.upraises: [], gcsafe, closure.} + Iter*[T] = ref object + finished*: bool + next*: GetNext[T] \ No newline at end of file