2
0
mirror of https://github.com/status-im/nim-codex.git synced 2025-01-12 11:54:12 +00:00
This commit is contained in:
Ben 2024-12-17 12:20:29 +01:00
parent 4b5c35534d
commit 512a736c79
No known key found for this signature in database
GPG Key ID: 0F16E812E736C24B
6 changed files with 36 additions and 48 deletions

@ -56,9 +56,19 @@ type
discoveryLoopSleep: Duration # Discovery loop sleep
inFlightDiscReqs*: Table[Cid, Future[seq[SignedPeerRecord]]] # Inflight discovery requests
proc getCid(address: BlockAddress): Cid =
# We advertise and discover only the CID part of a block address.
# Indices are ignored. This means that multiple blocks of the same tree will
# have a single DHT entry.
if address.leaf:
address.treeCid
else:
address.cid
proc discoveryQueueLoop(b: DiscoveryEngine) {.async: (raises: []).} =
while b.discEngineRunning:
for cid in toSeq(b.pendingBlocks.wantListBlockCids):
for address in toSeq(b.pendingBlocks.wantList):
let cid = address.getCid()
try:
await b.discoveryQueue.put(cid)
except CancelledError:
@ -88,10 +98,7 @@ proc discoveryTaskLoop(b: DiscoveryEngine) {.async: (raises: []).} =
trace "Discovery request already in progress", cid
continue
let
haves = b.peers.peersHave(cid)
if haves.len < b.minPeersPerBlock:
if b.peers.countPeersWhoHave(cid) < b.minPeersPerBlock:
try:
let
request = b.discovery
@ -127,8 +134,9 @@ proc discoveryTaskLoop(b: DiscoveryEngine) {.async: (raises: []).} =
info "Exiting discovery task runner"
proc queueFindBlocksReq*(b: DiscoveryEngine, cids: seq[Cid]) {.inline.} =
for cid in cids:
proc queueFindBlocksReq*(b: DiscoveryEngine, addresses: seq[BlockAddress]) {.inline.} =
for address in addresses:
let cid = address.getCid()
if cid notin b.discoveryQueue:
try:
b.discoveryQueue.putNoWait(cid)

@ -172,7 +172,7 @@ proc monitorBlockHandle(
# drop unresponsive peer
await b.network.switch.disconnect(peerId)
b.discovery.queueFindBlocksReq(@[address.cidOrTreeCid])
b.discovery.queueFindBlocksReq(@[address])
proc pickPseudoRandom(address: BlockAddress, peers: seq[BlockExcPeerCtx]): BlockExcPeerCtx =
return peers[hash(address) mod peers.len]
@ -187,7 +187,7 @@ proc requestBlock*(
let peers = b.peers.getPeersForBlock(address)
if peers.with.len == 0:
b.discovery.queueFindBlocksReq(@[address.cidOrTreeCid])
b.discovery.queueFindBlocksReq(@[address])
else:
let selected = pickPseudoRandom(address, peers.with)
asyncSpawn b.monitorBlockHandle(blockFuture, address, selected.id)
@ -245,21 +245,24 @@ proc blockPresenceHandler*(
# if none of the connected peers report our wants in their have list,
# fire up discovery
b.discovery.queueFindBlocksReq(
toSeq(b.pendingBlocks.wantListCids)
.filter do(cid: Cid) -> bool:
not b.peers.anyIt( cid in it.peerHaveCids ))
toSeq(b.pendingBlocks.wantList).filterIt(b.peers.peersHave(it).len == 0)
)
proc scheduleTasks(b: BlockExcEngine, blocksDelivery: seq[BlockDelivery]) {.async.} =
let
cids = blocksDelivery.mapIt( it.blk.cid )
addresses = blocksDelivery.mapIt( it.address )
# TODO: This code assumes p.peerWants are of type wantBlock, and will schedule
# the block-sending task. But, want might be wantHave. In this case,
# we should send a presence update. Peer is scheduled but task handler
# can only send blocks.
# schedule any new peers to provide blocks to
for p in b.peers:
for c in cids: # for each cid
# schedule a peer if it wants at least one cid
# and we have it in our local store
if c in p.peerWantsCids:
if await (c in b.localStore):
for address in addresses:
# schedule a peer if it wants at least one
if address in p.peerWants:
if await (address in b.localStore):
if b.scheduleTask(p):
trace "Task scheduled for peer", peer = p.id
else:

@ -130,19 +130,6 @@ iterator wantList*(p: PendingBlocksManager): BlockAddress =
for a in p.blocks.keys:
yield a
iterator wantListBlockCids*(p: PendingBlocksManager): Cid =
for a in p.blocks.keys:
if not a.leaf:
yield a.cid
iterator wantListCids*(p: PendingBlocksManager): Cid =
var yieldedCids = initHashSet[Cid]()
for a in p.blocks.keys:
let cid = a.cidOrTreeCid
if cid notin yieldedCids:
yieldedCids.incl(cid)
yield cid
iterator wantHandles*(p: PendingBlocksManager): Future[Block] =
for v in p.blocks.values:
yield v.handle

@ -38,12 +38,6 @@ type
proc peerHave*(self: BlockExcPeerCtx): seq[BlockAddress] =
toSeq(self.blocks.keys)
proc peerHaveCids*(self: BlockExcPeerCtx): HashSet[Cid] =
self.blocks.keys.toSeq.mapIt(it.cidOrTreeCid).toHashSet
proc peerWantsCids*(self: BlockExcPeerCtx): HashSet[Cid] =
self.peerWants.mapIt(it.address.cidOrTreeCid).toHashSet
proc contains*(self: BlockExcPeerCtx, address: BlockAddress): bool =
address in self.blocks

@ -64,15 +64,17 @@ func len*(self: PeerCtxStore): int =
func peersHave*(self: PeerCtxStore, address: BlockAddress): seq[BlockExcPeerCtx] =
toSeq(self.peers.values).filterIt( it.peerHave.anyIt( it == address ) )
func peersHave*(self: PeerCtxStore, cid: Cid): seq[BlockExcPeerCtx] =
toSeq(self.peers.values).filterIt( it.peerHave.anyIt( it.cidOrTreeCid == cid ) )
func countPeersWhoHave*(self: PeerCtxStore, cid: Cid): int =
proc getCid(address: BlockAddress): Cid =
if address.leaf:
address.treeCid
else:
address.cid
self.peers.values.countIt(it.peerHave.anyIt( it.getCid() == cid ) )
func peersWant*(self: PeerCtxStore, address: BlockAddress): seq[BlockExcPeerCtx] =
toSeq(self.peers.values).filterIt( it.peerWants.anyIt( it == address ) )
func peersWant*(self: PeerCtxStore, cid: Cid): seq[BlockExcPeerCtx] =
toSeq(self.peers.values).filterIt( it.peerWants.anyIt( it.address.cidOrTreeCid == cid ) )
proc getPeersForBlock*(self: PeerCtxStore, address: BlockAddress): PeersForBlock =
var res = PeersForBlock()
for peer in self:

@ -66,12 +66,6 @@ proc `$`*(a: BlockAddress): string =
else:
"cid: " & $a.cid
proc cidOrTreeCid*(a: BlockAddress): Cid =
if a.leaf:
a.treeCid
else:
a.cid
proc address*(b: Block): BlockAddress =
BlockAddress(leaf: false, cid: b.cid)