bittorrent-codex-docs/10 Notes/Codex WantList.md

201 lines
8.1 KiB
Markdown
Raw Permalink Normal View History

---
tags:
- codex/want-list
- codex/block-exchange
related:
- "[[Codex Block Exchange Protocol]]"
- "[[Uploading and downloading content in Codex]]"
---
#codex/want-list #codex/block-exchange
| related | [[Codex Block Exchange Protocol]], [[Uploading and downloading content in Codex]] |
| ------- | --------------------------------------------------------------------------------- |
When engine is being created, it subscribes to the `PeerEventKind.Joined` and `PeerEventKind.Left`:
```nim
network.switch.addPeerEventHandler(peerEventHandler, PeerEventKind.Joined)
network.switch.addPeerEventHandler(peerEventHandler, PeerEventKind.Left)
```
`PeerEventKind.Joined` is triggered when peers connects to us, and `PeerEventKind.Left` when peer disconnects from us.
When peer is joining, we call `setupPeer`
```nim
proc setupPeer*(b: BlockExcEngine, peer: PeerId) {.async.} =
## Perform initial setup, such as want
## list exchange
##
trace "Setting up peer", peer
if peer notin b.peers:
trace "Setting up new peer", peer
b.peers.add(BlockExcPeerCtx(id: peer))
trace "Added peer", peers = b.peers.len
# broadcast our want list, the other peer will do the same
if b.pendingBlocks.wantListLen > 0:
trace "Sending our want list to a peer", peer
let cids = toSeq(b.pendingBlocks.wantList)
await b.network.request.sendWantList(peer, cids, full = true)
if address =? b.pricing .? address:
await b.network.request.sendAccount(peer, Account(address: address))
```
Here is where we send the joining peer our `WantList`.
We get it from `PendingBlocksManager`. `PendingBlocksManager` has a list of *pending* blocks. Every time engine requests a block via `requestBlock(address)`, the block corresponding to the `address` provided becomes pending. It is done via call:
```nim
b.pendingBlocks.getWantHandle(address, b.blockFetchTimeout)
```
`getWantHandle` will put the request address on its `blocks` list, which is a mapping from `BlockAddress` to `BlockReq`:
```nim
p.blocks[address] = BlockReq(
handle: newFuture[Block]("pendingBlocks.getWantHandle"),
inFlight: inFlight,
startTime: getMonoTime().ticks,
)
```
At any given time, the pending blocks form our `WantList` and it will be sent to the joining peer:
```nim
await b.network.request.sendWantList(peer, cids, full = true)
```
where `request.sendWantList` is set to:
```nim
proc sendWantList(
id: PeerId,
cids: seq[BlockAddress],
priority: int32 = 0,
cancel: bool = false,
wantType: WantType = WantType.WantHave,
full: bool = false,
sendDontHave: bool = false,
): Future[void] {.gcsafe.} =
self.sendWantList(id, cids, priority, cancel, wantType, full, sendDontHave)
```
in `BlockExcNetwork.new`.
We see that `wantType` argument takes the default value `WantType.WantHave`. The `full` argument is set to `true` in this case, which means this is our full `WantList`.
Thus, intuitively, if a `cid` (`BlockAddress` to be precise) is on the `WantList` with `WantType.WantHave` it means that the corresponding node *wants to have* that cid.
Let's look closer at the `BlockExcEngine.requestBlock` proc:
```nim
proc requestBlock*(
b: BlockExcEngine, address: BlockAddress
): Future[?!Block] {.async.} =
let blockFuture = b.pendingBlocks.getWantHandle(address, b.blockFetchTimeout)
if not b.pendingBlocks.isInFlight(address):
let peers = b.peers.getPeersForBlock(address)
if peers.with.len == 0:
b.discovery.queueFindBlocksReq(@[address.cidOrTreeCid])
else:
let selected = pickPseudoRandom(address, peers.with)
asyncSpawn b.monitorBlockHandle(blockFuture, address, selected.id)
b.pendingBlocks.setInFlight(address)
await b.sendWantBlock(@[address], selected)
await b.sendWantHave(@[address], peers.without)
# Don't let timeouts bubble up. We can't be too broad here or we break
# cancellations.
try:
success await blockFuture
except AsyncTimeoutError as err:
failure err
```
> [!warning]
> `requestBlock` as we see above is undergoing some important changes and for a good reason. First it will be called `downloadInternal` and the `getWantHandle` will no longer be awaiting on returned handle (thus ultimately it will be doing what it days it does). Other important change to notice is that `sendWantHave` will be called only if there are no peers with the requested address; in the version above we see that `WantHave` is sent even if we have a peer with the request address to which we have just sent `WantBlock`.
When a node *requests* a block, we first check if the given pending block has the `inFlight` attribute set, indicating that the block has been recently requested from a remote node known to have it. If it is not the case, we first gather all the peers that have given `cid` and the complementary list of peers that do not have the given `cid`. If no peer in the swarm is having that `cid`, we will trigger discovery. Otherwise, we (pseudo) randomly choose one peer known to have the given `cid` and send it the `WantBlock` request. Subsequently, we then send the `WantHave` request to all the peers known not to have that `cid` (so that they know we are interested in it and let us know that have it once it is the case).
Now, let's look what happens when a peer receives the `WantList`. This is handled by `BlockExcEngine.wantListHandler`:
```nim
proc wantListHandler*(b: BlockExcEngine, peer: PeerId, wantList: WantList) {.async.} =
let peerCtx = b.peers.get(peer)
if peerCtx.isNil:
return
var
presence: seq[BlockPresence]
schedulePeer = false
for e in wantList.entries:
let idx = peerCtx.peerWants.findIt(it.address == e.address)
logScope:
peer = peerCtx.id
address = e.address
wantType = $e.wantType
if idx < 0: # Adding new entry to peer wants
let
have = await e.address in b.localStore
price = @(b.pricing.get(Pricing(price: 0.u256)).price.toBytesBE)
case e.wantType
of WantType.WantHave:
if have:
presence.add(
BlockPresence(
address: e.address, `type`: BlockPresenceType.Have, price: price
)
)
else:
if e.sendDontHave:
presence.add(
BlockPresence(
address: e.address, `type`: BlockPresenceType.DontHave, price: price
)
)
peerCtx.peerWants.add(e)
codex_block_exchange_want_have_lists_received.inc()
of WantType.WantBlock:
peerCtx.peerWants.add(e)
schedulePeer = true
codex_block_exchange_want_block_lists_received.inc()
else: # Updating existing entry in peer wants
# peer doesn't want this block anymore
if e.cancel:
trace "Canceling want for block", address = e.address
peerCtx.peerWants.del(idx)
else:
# peer might want to ask for the same cid with
# different want params
trace "Updating want for block", address = e.address
peerCtx.peerWants[idx] = e # update entry
if presence.len > 0:
trace "Sending presence to remote", items = presence.mapIt($it).join(",")
await b.network.request.sendPresence(peer, presence)
if schedulePeer:
if not b.scheduleTask(peerCtx):
warn "Unable to schedule task for peer", peer
```
We go though the `WantList` entries, one-by-one.
1. We check if the `WantList` item is already on the locally kept `WantList` associated with that peer (`peerCtx.peerWants`).
2. If it is not the case, we add new entry to the peer's `WantList`:
1. We first check if we already have the block corresponding to the `WantList` item in our `localStore`.
2. If we do, and the `WantList` item is `WantHave`, we add an entry to the `presence` list, otherwise (i.e. when `WantList` item is `WantHave` but we do not have the corresponding block in `localStore`) we add the entry to `peerCtx.peerWants`. If `WantList` item is `WantBlock` we add the corresponding entry to `peerCtx.peerWants` and set a flag to schedule a task where we will eventually send the requested block to the remote peer (we do that even regardless of if we have a block or not in `localStore`).
3. If the `WantList` item is already on the locally kept `WantList` associated with that peer, we just update the entry.