8.1 KiB
| tags | related | ||||
|---|---|---|---|---|---|
|
|
#codex/want-list #codex/block-exchange
| related | Codex Block Exchange Protocol, Uploading and downloading content in Codex |
|---|
When engine is being created, it subscribes to the PeerEventKind.Joined and PeerEventKind.Left:
network.switch.addPeerEventHandler(peerEventHandler, PeerEventKind.Joined)
network.switch.addPeerEventHandler(peerEventHandler, PeerEventKind.Left)
PeerEventKind.Joined is triggered when peers connects to us, and PeerEventKind.Left when peer disconnects from us.
When peer is joining, we call setupPeer
proc setupPeer*(b: BlockExcEngine, peer: PeerId) {.async.} =
## Perform initial setup, such as want
## list exchange
##
trace "Setting up peer", peer
if peer notin b.peers:
trace "Setting up new peer", peer
b.peers.add(BlockExcPeerCtx(id: peer))
trace "Added peer", peers = b.peers.len
# broadcast our want list, the other peer will do the same
if b.pendingBlocks.wantListLen > 0:
trace "Sending our want list to a peer", peer
let cids = toSeq(b.pendingBlocks.wantList)
await b.network.request.sendWantList(peer, cids, full = true)
if address =? b.pricing .? address:
await b.network.request.sendAccount(peer, Account(address: address))
Here is where we send the joining peer our WantList.
We get it from PendingBlocksManager. PendingBlocksManager has a list of pending blocks. Every time engine requests a block via requestBlock(address), the block corresponding to the address provided becomes pending. It is done via call:
b.pendingBlocks.getWantHandle(address, b.blockFetchTimeout)
getWantHandle will put the request address on its blocks list, which is a mapping from BlockAddress to BlockReq:
p.blocks[address] = BlockReq(
handle: newFuture[Block]("pendingBlocks.getWantHandle"),
inFlight: inFlight,
startTime: getMonoTime().ticks,
)
At any given time, the pending blocks form our WantList and it will be sent to the joining peer:
await b.network.request.sendWantList(peer, cids, full = true)
where request.sendWantList is set to:
proc sendWantList(
id: PeerId,
cids: seq[BlockAddress],
priority: int32 = 0,
cancel: bool = false,
wantType: WantType = WantType.WantHave,
full: bool = false,
sendDontHave: bool = false,
): Future[void] {.gcsafe.} =
self.sendWantList(id, cids, priority, cancel, wantType, full, sendDontHave)
in BlockExcNetwork.new.
We see that wantType argument takes the default value WantType.WantHave. The full argument is set to true in this case, which means this is our full WantList.
Thus, intuitively, if a cid (BlockAddress to be precise) is on the WantList with WantType.WantHave it means that the corresponding node wants to have that cid.
Let's look closer at the BlockExcEngine.requestBlock proc:
proc requestBlock*(
b: BlockExcEngine, address: BlockAddress
): Future[?!Block] {.async.} =
let blockFuture = b.pendingBlocks.getWantHandle(address, b.blockFetchTimeout)
if not b.pendingBlocks.isInFlight(address):
let peers = b.peers.getPeersForBlock(address)
if peers.with.len == 0:
b.discovery.queueFindBlocksReq(@[address.cidOrTreeCid])
else:
let selected = pickPseudoRandom(address, peers.with)
asyncSpawn b.monitorBlockHandle(blockFuture, address, selected.id)
b.pendingBlocks.setInFlight(address)
await b.sendWantBlock(@[address], selected)
await b.sendWantHave(@[address], peers.without)
# Don't let timeouts bubble up. We can't be too broad here or we break
# cancellations.
try:
success await blockFuture
except AsyncTimeoutError as err:
failure err
Warning
requestBlockas we see above is undergoing some important changes and for a good reason. First it will be calleddownloadInternaland thegetWantHandlewill no longer be awaiting on returned handle (thus ultimately it will be doing what it days it does). Other important change to notice is thatsendWantHavewill be called only if there are no peers with the requested address; in the version above we see thatWantHaveis sent even if we have a peer with the request address to which we have just sentWantBlock.
When a node requests a block, we first check if the given pending block has the inFlight attribute set, indicating that the block has been recently requested from a remote node known to have it. If it is not the case, we first gather all the peers that have given cid and the complementary list of peers that do not have the given cid. If no peer in the swarm is having that cid, we will trigger discovery. Otherwise, we (pseudo) randomly choose one peer known to have the given cid and send it the WantBlock request. Subsequently, we then send the WantHave request to all the peers known not to have that cid (so that they know we are interested in it and let us know that have it once it is the case).
Now, let's look what happens when a peer receives the WantList. This is handled by BlockExcEngine.wantListHandler:
proc wantListHandler*(b: BlockExcEngine, peer: PeerId, wantList: WantList) {.async.} =
let peerCtx = b.peers.get(peer)
if peerCtx.isNil:
return
var
presence: seq[BlockPresence]
schedulePeer = false
for e in wantList.entries:
let idx = peerCtx.peerWants.findIt(it.address == e.address)
logScope:
peer = peerCtx.id
address = e.address
wantType = $e.wantType
if idx < 0: # Adding new entry to peer wants
let
have = await e.address in b.localStore
price = @(b.pricing.get(Pricing(price: 0.u256)).price.toBytesBE)
case e.wantType
of WantType.WantHave:
if have:
presence.add(
BlockPresence(
address: e.address, `type`: BlockPresenceType.Have, price: price
)
)
else:
if e.sendDontHave:
presence.add(
BlockPresence(
address: e.address, `type`: BlockPresenceType.DontHave, price: price
)
)
peerCtx.peerWants.add(e)
codex_block_exchange_want_have_lists_received.inc()
of WantType.WantBlock:
peerCtx.peerWants.add(e)
schedulePeer = true
codex_block_exchange_want_block_lists_received.inc()
else: # Updating existing entry in peer wants
# peer doesn't want this block anymore
if e.cancel:
trace "Canceling want for block", address = e.address
peerCtx.peerWants.del(idx)
else:
# peer might want to ask for the same cid with
# different want params
trace "Updating want for block", address = e.address
peerCtx.peerWants[idx] = e # update entry
if presence.len > 0:
trace "Sending presence to remote", items = presence.mapIt($it).join(",")
await b.network.request.sendPresence(peer, presence)
if schedulePeer:
if not b.scheduleTask(peerCtx):
warn "Unable to schedule task for peer", peer
We go though the WantList entries, one-by-one.
- We check if the
WantListitem is already on the locally keptWantListassociated with that peer (peerCtx.peerWants). - If it is not the case, we add new entry to the peer's
WantList:- We first check if we already have the block corresponding to the
WantListitem in ourlocalStore. - If we do, and the
WantListitem isWantHave, we add an entry to thepresencelist, otherwise (i.e. whenWantListitem isWantHavebut we do not have the corresponding block inlocalStore) we add the entry topeerCtx.peerWants. IfWantListitem isWantBlockwe add the corresponding entry topeerCtx.peerWantsand set a flag to schedule a task where we will eventually send the requested block to the remote peer (we do that even regardless of if we have a block or not inlocalStore).
- We first check if we already have the block corresponding to the
- If the
WantListitem is already on the locally keptWantListassociated with that peer, we just update the entry.