Cleanup changes
This commit is contained in:
parent
3a2d0926f1
commit
d6b0de080c
|
@ -136,20 +136,23 @@ proc sendWantHave(
|
||||||
for p in peers:
|
for p in peers:
|
||||||
if p notin excluded:
|
if p notin excluded:
|
||||||
if address notin p.peerHave:
|
if address notin p.peerHave:
|
||||||
|
trace "Sending wantHave", address, peer = p.id
|
||||||
await b.network.request.sendWantList(
|
await b.network.request.sendWantList(
|
||||||
p.id,
|
p.id,
|
||||||
@[address],
|
@[address],
|
||||||
wantType = WantType.WantHave) # we only want to know if the peer has the block
|
wantType = WantType.WantHave) # we only want to know if the peer has the block
|
||||||
|
codex_block_exchange_want_have_lists_sent.inc()
|
||||||
|
|
||||||
proc sendWantBlock(
|
proc sendWantBlock(
|
||||||
b: BlockExcEngine,
|
b: BlockExcEngine,
|
||||||
address: BlockAddress, # pluralize this entire call chain, please
|
address: BlockAddress, # pluralize this entire call chain, please
|
||||||
blockPeer: BlockExcPeerCtx): Future[void] {.async.} =
|
blockPeer: BlockExcPeerCtx): Future[void] {.async.} =
|
||||||
trace "Sending wantBlock request to", peer = blockPeer.id, address
|
trace "Sending wantBlock", address, peer = blockPeer.id
|
||||||
await b.network.request.sendWantList(
|
await b.network.request.sendWantList(
|
||||||
blockPeer.id,
|
blockPeer.id,
|
||||||
@[address],
|
@[address],
|
||||||
wantType = WantType.WantBlock) # we want this remote to send us a block
|
wantType = WantType.WantBlock) # we want this remote to send us a block
|
||||||
|
codex_block_exchange_want_block_lists_sent.inc()
|
||||||
|
|
||||||
proc monitorBlockHandle(
|
proc monitorBlockHandle(
|
||||||
b: BlockExcEngine,
|
b: BlockExcEngine,
|
||||||
|
@ -183,24 +186,17 @@ proc requestBlock*(
|
||||||
|
|
||||||
if not b.pendingBlocks.isInFlight(address):
|
if not b.pendingBlocks.isInFlight(address):
|
||||||
let peers = b.peers.selectCheapest(address)
|
let peers = b.peers.selectCheapest(address)
|
||||||
if peers.len == 0:
|
if peers.len > 0:
|
||||||
b.discovery.queueFindBlocksReq(@[address.cidOrTreeCid])
|
let peer = peers[hash(address) mod peers.len]
|
||||||
|
trace "Existing peers found for address.", address, nPeers = peers.len, selectedPeer = peer.id
|
||||||
let maybePeer =
|
|
||||||
if peers.len > 0:
|
|
||||||
peers[hash(address) mod peers.len].some
|
|
||||||
elif b.peers.len > 0:
|
|
||||||
toSeq(b.peers)[hash(address) mod b.peers.len].some
|
|
||||||
else:
|
|
||||||
BlockExcPeerCtx.none
|
|
||||||
|
|
||||||
if peer =? maybePeer:
|
|
||||||
asyncSpawn b.monitorBlockHandle(blockFuture, address, peer.id)
|
asyncSpawn b.monitorBlockHandle(blockFuture, address, peer.id)
|
||||||
b.pendingBlocks.setInFlight(address)
|
b.pendingBlocks.setInFlight(address)
|
||||||
await b.sendWantBlock(address, peer)
|
await b.sendWantBlock(address, peer)
|
||||||
codex_block_exchange_want_block_lists_sent.inc()
|
|
||||||
await b.sendWantHave(address, @[peer], toSeq(b.peers))
|
await b.sendWantHave(address, @[peer], toSeq(b.peers))
|
||||||
codex_block_exchange_want_have_lists_sent.inc()
|
else:
|
||||||
|
trace "No existing peers found for address.", address
|
||||||
|
b.discovery.queueFindBlocksReq(@[address.cidOrTreeCid])
|
||||||
|
await b.sendWantHave(address, @[], toSeq(b.peers))
|
||||||
|
|
||||||
# Don't let timeouts bubble up. We can't be too broad here or we break
|
# Don't let timeouts bubble up. We can't be too broad here or we break
|
||||||
# cancellations.
|
# cancellations.
|
||||||
|
@ -494,7 +490,9 @@ proc setupPeer*(b: BlockExcEngine, peer: PeerId) {.async.} =
|
||||||
trace "Sending our want list to a peer", peer
|
trace "Sending our want list to a peer", peer
|
||||||
let cids = toSeq(b.pendingBlocks.wantList)
|
let cids = toSeq(b.pendingBlocks.wantList)
|
||||||
await b.network.request.sendWantList(
|
await b.network.request.sendWantList(
|
||||||
peer, cids, full = true)
|
peer, cids, full = true,
|
||||||
|
wantType = WantType.WantHave)
|
||||||
|
codex_block_exchange_want_have_lists_sent.inc()
|
||||||
|
|
||||||
if address =? b.pricing.?address:
|
if address =? b.pricing.?address:
|
||||||
await b.network.request.sendAccount(peer, Account(address: address))
|
await b.network.request.sendAccount(peer, Account(address: address))
|
||||||
|
|
|
@ -71,19 +71,17 @@ func peersWant*(self: PeerCtxStore, cid: Cid): seq[BlockExcPeerCtx] =
|
||||||
toSeq(self.peers.values).filterIt( it.peerWants.anyIt( it.address.cidOrTreeCid == cid ) )
|
toSeq(self.peers.values).filterIt( it.peerWants.anyIt( it.address.cidOrTreeCid == cid ) )
|
||||||
|
|
||||||
func selectCheapest*(self: PeerCtxStore, address: BlockAddress): seq[BlockExcPeerCtx] =
|
func selectCheapest*(self: PeerCtxStore, address: BlockAddress): seq[BlockExcPeerCtx] =
|
||||||
# assume that the price for all leaves in a tree is the same
|
var peers = self.peersHave(address)
|
||||||
let rootAddress = BlockAddress(leaf: false, cid: address.cidOrTreeCid)
|
|
||||||
var peers = self.peersHave(rootAddress)
|
|
||||||
|
|
||||||
func cmp(a, b: BlockExcPeerCtx): int =
|
func cmp(a, b: BlockExcPeerCtx): int =
|
||||||
var
|
var
|
||||||
priceA = 0.u256
|
priceA = 0.u256
|
||||||
priceB = 0.u256
|
priceB = 0.u256
|
||||||
|
|
||||||
a.blocks.withValue(rootAddress, precense):
|
a.blocks.withValue(address, precense):
|
||||||
priceA = precense[].price
|
priceA = precense[].price
|
||||||
|
|
||||||
b.blocks.withValue(rootAddress, precense):
|
b.blocks.withValue(address, precense):
|
||||||
priceB = precense[].price
|
priceB = precense[].price
|
||||||
|
|
||||||
if priceA == priceB:
|
if priceA == priceB:
|
||||||
|
@ -94,7 +92,7 @@ func selectCheapest*(self: PeerCtxStore, address: BlockAddress): seq[BlockExcPee
|
||||||
-1
|
-1
|
||||||
|
|
||||||
peers.sort(cmp)
|
peers.sort(cmp)
|
||||||
trace "Selected cheapest peers", peers = peers.len
|
trace "Selected cheapest peers", address = address, peers = peers.len
|
||||||
return peers
|
return peers
|
||||||
|
|
||||||
proc new*(T: type PeerCtxStore): PeerCtxStore =
|
proc new*(T: type PeerCtxStore): PeerCtxStore =
|
||||||
|
|
Loading…
Reference in New Issue