diff --git a/codex/blockexchange/engine/discovery.nim b/codex/blockexchange/engine/discovery.nim index 5a930967..0eb04f64 100644 --- a/codex/blockexchange/engine/discovery.nim +++ b/codex/blockexchange/engine/discovery.nim @@ -152,7 +152,7 @@ proc discoveryTaskLoop(b: DiscoveryEngine) {.async.} = let peers = await request - trace "Discovered peers", peers = peers.len + trace "Discovered peers for block", peers = peers.len, cid let dialed = await allFinished( peers.mapIt( b.network.dialPeer(it.data) )) diff --git a/codex/blockexchange/engine/engine.nim b/codex/blockexchange/engine/engine.nim index b8a05935..c517c72a 100644 --- a/codex/blockexchange/engine/engine.nim +++ b/codex/blockexchange/engine/engine.nim @@ -116,14 +116,39 @@ proc stop*(b: BlockExcEngine) {.async.} = trace "NetworkStore stopped" +proc sendWantHave(b: BlockExcEngine, cid: Cid, selectedPeer: BlockExcPeerCtx, peers: seq[BlockExcPeerCtx]): Future[void] {.async.} = + trace "Sending wantHave request to peers", cid + for p in peers: + if p != selectedPeer: + if cid notin p.peerHave: + trace " wantHave > ", peer = p.id + await b.network.request.sendWantList( + p.id, + @[cid], + wantType = WantType.WantHave) # we only want to know if the peer has the block + +proc sendWantBlock(b: BlockExcEngine, cid: Cid, blockPeer: BlockExcPeerCtx): Future[void] {.async.} = + trace "Sending wantBlock request to", peer = blockPeer.id, cid + await b.network.request.sendWantList( + blockPeer.id, + @[cid], + wantType = WantType.WantBlock) # we want this remote to send us a block + +proc findCheapestPeerForBlock(b: BlockExcEngine, cheapestPeers: seq[BlockExcPeerCtx]): ?BlockExcPeerCtx = + if cheapestPeers.len <= 0: + trace "No cheapest peers, selecting first in list" + let + peers = toSeq(b.peers) # Get any peer + if peers.len <= 0: + return none(BlockExcPeerCtx) + return some(peers[0]) + return some(cheapestPeers[0]) # get cheapest + proc requestBlock*( b: BlockExcEngine, cid: Cid, timeout = DefaultBlockTimeout): Future[bt.Block] {.async.} = - ## Request a block from remotes - ## - - trace "Requesting block", cid, peers = b.peers.len + trace "Begin block request", cid, peers = b.peers.len if b.pendingBlocks.isInFlight(cid): trace "Request handle already pending", cid @@ -132,23 +157,18 @@ proc requestBlock*( let blk = b.pendingBlocks.getWantHandle(cid, timeout) + trace "Selecting peers who have", cid var peers = b.peers.selectCheapest(cid) - if peers.len <= 0: - trace "No cheapest peers, selecting first in list", cid - peers = toSeq(b.peers) # Get any peer - if peers.len <= 0: - trace "No peers to request blocks from", cid + without blockPeer =? b.findCheapestPeerForBlock(peers): + trace "No peers to request blocks from. Queue discovery...", cid b.discovery.queueFindBlocksReq(@[cid]) return await blk - let - blockPeer = peers[0] # get cheapest - proc blockHandleMonitor() {.async.} = try: - trace "Monigoring block handle", cid + trace "Monitoring block handle", cid b.pendingBlocks.setInFlight(cid, true) discard await blk trace "Block handle success", cid @@ -165,32 +185,17 @@ proc requestBlock*( # drop unresponsive peer await b.network.switch.disconnect(blockPeer.id) - trace "Sending block request to peer", peer = blockPeer.id, cid - # monitor block handle asyncSpawn blockHandleMonitor() - # request block - await b.network.request.sendWantList( - blockPeer.id, - @[cid], - wantType = WantType.WantBlock) # we want this remote to send us a block + await b.sendWantBlock(cid, blockPeer) if (peers.len - 1) == 0: trace "No peers to send want list to", cid b.discovery.queueFindBlocksReq(@[cid]) - return await blk # no peers to send wants to + return await blk - # filter out the peer we've already requested from - let remaining = peers[1..min(peers.high, b.peersPerRequest)] - trace "Sending want list to remaining peers", count = remaining.len - for p in remaining: - if cid notin p.peerHave: - # just send wants - await b.network.request.sendWantList( - p.id, - @[cid], - wantType = WantType.WantHave) # we only want to know if the peer has the block + await b.sendWantHave(cid, blockPeer, toSeq(b.peers)) return await blk @@ -198,9 +203,6 @@ proc blockPresenceHandler*( b: BlockExcEngine, peer: PeerId, blocks: seq[BlockPresence]) {.async.} = - ## Handle block presence - ## - trace "Received presence update for peer", peer, blocks = blocks.len let peerCtx = b.peers.get(peer) @@ -226,20 +228,17 @@ proc blockPresenceHandler*( ) if dontWantCids.len > 0: - trace "Cleaning peer haves", peer, count = dontWantCids.len peerCtx.cleanPresence(dontWantCids) - trace "Peer want/have", items = peerHave.len, wantList = wantList.len let wantCids = wantList.filterIt( it in peerHave ) if wantCids.len > 0: - trace "Getting blocks based on updated precense", peer, count = wantCids.len + trace "Peer has blocks in our wantList", peer, count = wantCids.len discard await allFinished( - wantCids.mapIt(b.requestBlock(it))) - trace "Requested blocks based on updated precense", peer, count = wantCids.len + wantCids.mapIt(b.sendWantBlock(it, peerCtx))) # if none of the connected peers report our wants in their have list, # fire up discovery @@ -269,10 +268,6 @@ proc scheduleTasks(b: BlockExcEngine, blocks: seq[bt.Block]) {.async.} = break # do next peer proc resolveBlocks*(b: BlockExcEngine, blocks: seq[bt.Block]) {.async.} = - ## Resolve pending blocks from the pending blocks manager - ## and schedule any new task to be ran - ## - trace "Resolving blocks", blocks = blocks.len b.pendingBlocks.resolve(blocks) @@ -296,9 +291,6 @@ proc blocksHandler*( b: BlockExcEngine, peer: PeerId, blocks: seq[bt.Block]) {.async.} = - ## handle incoming blocks - ## - trace "Got blocks from peer", peer, len = blocks.len for blk in blocks: if isErr (await b.localStore.putBlock(blk)): @@ -309,24 +301,22 @@ proc blocksHandler*( peerCtx = b.peers.get(peer) if peerCtx != nil: - # we don't care about this blocks anymore, lets cleanup the list await b.payForBlocks(peerCtx, blocks) + ## shouldn't we remove them from the want-list instead of this: peerCtx.cleanPresence(blocks.mapIt( it.cid )) proc wantListHandler*( b: BlockExcEngine, peer: PeerId, wantList: Wantlist) {.async.} = - ## Handle incoming want lists - ## - - trace "Got want list for peer", peer, items = wantList.entries.len - let peerCtx = b.peers.get(peer) + trace "Got wantList for peer", peer, items = wantList.entries.len + let + peerCtx = b.peers.get(peer) if isNil(peerCtx): return var - precense: seq[BlockPresence] + presence: seq[BlockPresence] for e in wantList.entries: let @@ -347,15 +337,15 @@ proc wantListHandler*( .price.toBytesBE) if not have and e.sendDontHave: - trace "Adding dont have entry to precense response", cid = e.cid - precense.add( + trace "Adding dont have entry to presence response", cid = e.cid + presence.add( BlockPresence( cid: e.cid.data.buffer, `type`: BlockPresenceType.DontHave, price: price)) elif have and e.wantType == WantType.WantHave: - trace "Adding have entry to precense response", cid = e.cid - precense.add( + trace "Adding have entry to presence response", cid = e.cid + presence.add( BlockPresence( cid: e.cid.data.buffer, `type`: BlockPresenceType.Have, @@ -374,10 +364,11 @@ proc wantListHandler*( # different want params peerCtx.peerWants[idx] = e # update entry - if precense.len > 0: - trace "Sending precense to remote", items = precense.len - await b.network.request.sendPresence(peer, precense) + if presence.len > 0: + trace "Sending presence to remote", items = presence.len + await b.network.request.sendPresence(peer, presence) + trace "Scheduling a task for this peer, to look over their want-list", peer if not b.scheduleTask(peerCtx): trace "Unable to schedule task for peer", peer @@ -385,7 +376,8 @@ proc accountHandler*( engine: BlockExcEngine, peer: PeerId, account: Account) {.async.} = - let context = engine.peers.get(peer) + let + context = engine.peers.get(peer) if context.isNil: return @@ -403,7 +395,8 @@ proc paymentHandler*( return if channel =? context.paymentChannel: - let sender = account.address + let + sender = account.address discard engine.wallet.acceptPayment(channel, Asset, sender, payment) else: context.paymentChannel = engine.wallet.acceptChannel(payment).option @@ -451,6 +444,7 @@ proc taskHandler*(b: BlockExcEngine, task: BlockExcPeerCtx) {.gcsafe, async.} = it.wantType == WantType.WantBlock ) + trace "wantsBlocks", peer = task.id, n = wantsBlocks.len if wantsBlocks.len > 0: trace "Got peer want blocks list", items = wantsBlocks.len @@ -473,7 +467,6 @@ proc taskHandler*(b: BlockExcEngine, task: BlockExcPeerCtx) {.gcsafe, async.} = task.id, blocks) - trace "About to remove entries from peerWants", blocks = blocks.len, items = task.peerWants.len # Remove successfully sent blocks task.peerWants.keepIf( proc(e: Entry): bool = diff --git a/codex/blockexchange/peers/peerctxstore.nim b/codex/blockexchange/peers/peerctxstore.nim index a38196b2..340d3ee0 100644 --- a/codex/blockexchange/peers/peerctxstore.nim +++ b/codex/blockexchange/peers/peerctxstore.nim @@ -66,10 +66,8 @@ func peersWant*(self: PeerCtxStore, cid: Cid): seq[BlockExcPeerCtx] = toSeq(self.peers.values).filterIt( it.peerWants.anyIt( it.cid == cid ) ) func selectCheapest*(self: PeerCtxStore, cid: Cid): seq[BlockExcPeerCtx] = - var - peers = self.peersHave(cid) + var peers = self.peersHave(cid) - trace "Selecting cheapest peers", peers = peers.len func cmp(a, b: BlockExcPeerCtx): int = var priceA = 0.u256 diff --git a/codex/node.nim b/codex/node.nim index db4c18cd..f580c33c 100644 --- a/codex/node.nim +++ b/codex/node.nim @@ -83,12 +83,14 @@ proc fetchManifest*( if err =? cid.isManifest.errorOption: return failure "CID has invalid content type for manifest {$cid}" - trace "Received manifest retrieval request", cid + trace "Retrieving manifest for cid", cid without blk =? await node.blockStore.getBlock(cid), err: - trace "Error retriving manifest block", cid, err = err.msg + trace "Error retrieve manifest block", cid, err = err.msg return failure err + trace "Decoding manifest for cid", cid + without manifest =? Manifest.decode(blk), err: trace "Unable to decode as manifest", err = err.msg return failure("Unable to decode as manifest") diff --git a/codex/stores/networkstore.nim b/codex/stores/networkstore.nim index e31fbbd2..9e815caa 100644 --- a/codex/stores/networkstore.nim +++ b/codex/stores/networkstore.nim @@ -32,17 +32,17 @@ type localStore*: BlockStore # local block store method getBlock*(self: NetworkStore, cid: Cid): Future[?!bt.Block] {.async.} = - ## Get a block from a remote peer - ## - trace "Getting block from local store or network", cid without blk =? await self.localStore.getBlock(cid), error: if not (error of BlockNotFoundError): return failure error trace "Block not in local store", cid - # TODO: What if block isn't available in the engine too? - # TODO: add retrieved block to the local store - return (await self.engine.requestBlock(cid)).catch + + without newBlock =? (await self.engine.requestBlock(cid)).catch, error: + trace "Unable to get block from exchange engine", cid + return failure error + + return success newBlock return success blk diff --git a/docs/DownloadFlow.md b/docs/DownloadFlow.md new file mode 100644 index 00000000..040897eb --- /dev/null +++ b/docs/DownloadFlow.md @@ -0,0 +1,68 @@ +# Download Flow +Sequence of interactions that result in dat blocks being transferred across the network. + +## Local Store +When data is available in the local blockstore, + +```mermaid +sequenceDiagram +actor Alice +participant API +Alice->>API: Download(CID) +API->>+Node/StoreStream: Retrieve(CID) +loop Get manifest block, then data blocks + Node/StoreStream->>NetworkStore: GetBlock(CID) + NetworkStore->>LocalStore: GetBlock(CID) + LocalStore->>NetworkStore: Block + NetworkStore->>Node/StoreStream: Block +end +Node/StoreStream->>Node/StoreStream: Handle erasure coding +Node/StoreStream->>-API: Data stream +API->>Alice: Stream download of block +``` + +## Network Store +When data is not found ih the local blockstore, the block-exchange engine is used to discover the location of the block within the network. Connection will be established to the node(s) that have the block, and exchange can take place. + +```mermaid +sequenceDiagram +box +actor Alice +participant API +participant Node/StoreStream +participant NetworkStore +participant Discovery +participant Engine +end +box +participant OtherNode +end +Alice->>API: Download(CID) +API->>+Node/StoreStream: Retrieve(CID) +Node/StoreStream->>-API: Data stream +API->>Alice: Download stream begins +loop Get manifest block, then data blocks + Node/StoreStream->>NetworkStore: GetBlock(CID) + NetworkStore->>Engine: RequestBlock(CID) + opt CID not known + Engine->>Discovery: Discovery Block + Discovery->>Discovery: Locates peers who provide block + Discovery->>Engine: Peers + Engine->>Engine: Update peers admin + end + Engine->>Engine: Select optimal peer + Engine->>OtherNode: Send WantHave list + OtherNode->>Engine: Send BlockPresence + Engine->>Engine: Update peers admin + Engine->>Engine: Decide to buy block + Engine->>OtherNode: Send WantBlock list + OtherNode->>Engine: Send Block + Engine->>NetworkStore: Block + NetworkStore->>NetworkStore: Add to Local store + NetworkStore->>Node/StoreStream: Resolve Block + Node/StoreStream->>Node/StoreStream: Handle erasure coding + Node/StoreStream->>API: Push data to stream +end +API->>Alice: Download stream finishes +``` +