feat: modify retry mechanism; add DHT guard rails; improve block cancellation handling

This commit is contained in:
gmega 2025-07-01 21:20:14 -03:00 committed by Chrysostomos Nanakos
parent 38a0656cab
commit d91fd053e7
No known key found for this signature in database
3 changed files with 102 additions and 33 deletions

View File

@ -73,6 +73,9 @@ const
DefaultMaxBlocksPerMessage = 500
DefaultTaskQueueSize = 100
DefaultConcurrentTasks = 10
# Don't do more than one discovery request per `DiscoveryRateLimit` seconds.
DiscoveryRateLimit = 1.seconds
DefaultPeerActivityTimeout = 1.minutes
type
TaskHandler* = proc(task: BlockExcPeerCtx): Future[void] {.gcsafe.}
@ -94,6 +97,7 @@ type
pricing*: ?Pricing # Optional bandwidth pricing
discovery*: DiscoveryEngine
advertiser*: Advertiser
lastDiscRequest: Moment # time of last discovery request
Pricing* = object
address*: EthAddress
@ -193,6 +197,14 @@ proc refreshBlockKnowledge(self: BlockExcEngine) {.async: (raises: [CancelledErr
# efficient about it.
await self.refreshBlockKnowledge(peer)
proc searchForNewPeers(self: BlockExcEngine, cid: Cid) =
if self.lastDiscRequest + DiscoveryRateLimit < Moment.now():
trace "Searching for new peers for", cid = cid
self.lastDiscRequest = Moment.now() # always refresh before calling await!
self.discovery.queueFindBlocksReq(@[cid])
else:
trace "Not searching for new peers, rate limit not expired", cid = cid
proc randomPeer(peers: seq[BlockExcPeerCtx]): BlockExcPeerCtx =
Rng.instance.sample(peers)
@ -215,34 +227,56 @@ proc downloadInternal(
handle.fail(newException(RetriesExhaustedError, "Error retries exhausted"))
break
trace "Running retry handle"
let peers = self.peers.getPeersForBlock(address)
logScope:
peersWith = peers.with.len
peersWithout = peers.without.len
trace "Peers for block"
if peers.with.len > 0:
self.pendingBlocks.setInFlight(address, true)
await self.sendWantBlock(@[address], peers.with.randomPeer)
else:
self.pendingBlocks.setInFlight(address, false)
if peers.with.len == 0:
# We know of no peers that have the block.
if peers.without.len > 0:
# We have peers connected, but none of them have the block. This
# If we have peers connected but none of them have the block, this
# could be because our knowledge about what they have has run stale.
# Tries to refresh it.
await self.refreshBlockKnowledge()
self.discovery.queueFindBlocksReq(@[address.cidOrTreeCid])
# Also tries to look for new peers for good measure.
# TODO: in the future, peer search and knowledge maintenance should
# be completely decoupled from one another. It is very hard to
# control what happens and how many neighbors we get like this.
self.searchForNewPeers(address.cidOrTreeCid)
# FIXME: blocks should not blindly reschedule themselves. Instead,
# we should only reschedule a block if the peer drops, or we are
# in endgame mode.
await (handle or sleepAsync(self.pendingBlocks.retryInterval))
# We wait for a bit and then retry. Since we've shipped our wantlists to
# connected peers, they might reply and we might request the block in the
# meantime as part of `blockPresenceHandler`.
await handle or sleepAsync(self.pendingBlocks.retryInterval)
# If we already got the block, we're done. Otherwise, we'll go for another
# cycle, potentially refreshing knowledge on peers again, and looking up
# the DHT again.
if handle.finished:
break
trace "No peers for block, will retry shortly"
continue
let scheduledPeer = peers.with.randomPeer
self.pendingBlocks.setInFlight(address, true)
scheduledPeer.blockRequested(address)
await self.sendWantBlock(@[address], scheduledPeer)
let activityTimer = scheduledPeer.activityTimer()
await handle or activityTimer # TODO: or peerDropped
activityTimer.cancel()
# XXX: we should probably not have this. Blocks should be retried
# to infinity unless cancelled by the client.
self.pendingBlocks.decRetries(address)
if handle.finished:
trace "Handle for block finished", failed = handle.failed
break
else:
# If the peer timed out, retries immediately.
trace "Dropping timed out peer.", peer = scheduledPeer.id
# TODO: disconnect peer
except CancelledError as exc:
trace "Block download cancelled"
if not handle.finished:
@ -353,6 +387,7 @@ proc blockPresenceHandler*(
for address in ourWantCids:
self.pendingBlocks.setInFlight(address, true)
self.pendingBlocks.decRetries(address)
peerCtx.blockRequested(address)
if ourWantCids.len > 0:
trace "Peer has blocks in our wantList", peer, wants = ourWantCids
@ -407,15 +442,13 @@ proc cancelBlocks(
return entry.peerId
try:
# Does the peer have any of the blocks we're canceling?
for peerCtx in self.peers.peers.values:
let intersection = peerCtx.peerHave.intersection(addrSet)
# Have we requested any of the blocks we're cancelling to this peer?
let intersection = peerCtx.blocksRequested.intersection(addrSet)
if intersection.len > 0:
pendingCancellations[peerCtx.id] = intersection
# If so, dispatches cancellations.
# FIXME: we're still spamming peers - the fact that the peer has the block does
# not mean we've requested it.
let (succeededFuts, failedFuts) = await allFinishedFailed[PeerId](
toSeq(pendingCancellations.pairs).map(processPeer)
)
@ -424,6 +457,8 @@ proc cancelBlocks(
let ctx = self.peers.get(peerId)
if not ctx.isNil:
ctx.cleanPresence(addrs)
for address in pendingCancellations[peerId]:
ctx.blockRequestCancelled(address)
if failedFuts.len > 0:
warn "Failed to send block request cancellations to peers", peers = failedFuts.len
@ -498,6 +533,8 @@ proc blocksDeliveryHandler*(
trace "Received blocks from peer", peer, blocks = (blocksDelivery.mapIt(it.address))
var validatedBlocksDelivery: seq[BlockDelivery]
let peerCtx = self.peers.get(peer)
for bd in blocksDelivery:
logScope:
peer = peer
@ -523,6 +560,9 @@ proc blocksDeliveryHandler*(
).errorOption:
warn "Unable to store proof and cid for a block"
continue
if peerCtx != nil:
peerCtx.blockReceived(bd.address)
except CatchableError as exc:
warn "Error handling block delivery", error = exc.msg
continue
@ -531,7 +571,6 @@ proc blocksDeliveryHandler*(
codex_block_exchange_blocks_received.inc(validatedBlocksDelivery.len.int64)
let peerCtx = self.peers.get(peer)
if peerCtx != nil:
if err =? catch(await self.payForBlocks(peerCtx, blocksDelivery)).errorOption:
warn "Error paying for blocks", err = err.msg
@ -658,7 +697,7 @@ proc setupPeer*(
trace "Setting up peer", peer
if peer notin self.peers:
let peerCtx = BlockExcPeerCtx(id: peer)
let peerCtx = BlockExcPeerCtx(id: peer, activityTimeout: DefaultPeerActivityTimeout)
trace "Setting up new peer", peer
self.peers.add(peerCtx)
trace "Added peer", peers = self.peers.len
@ -707,14 +746,14 @@ proc taskHandler*(
# Send to the peer blocks he wants to get,
# if they present in our local store
# Blocks that are in flight have already been picked up by other tasks and
# Blocks that have been sent have already been picked up by other tasks and
# should not be re-sent.
var
wantedBlocks = peerCtx.wantedBlocks.filterIt(not peerCtx.isInFlight(it))
wantedBlocks = peerCtx.wantedBlocks.filterIt(not peerCtx.isBlockSent(it))
sent: HashSet[BlockAddress]
for wantedBlock in wantedBlocks:
peerCtx.addInFlight(wantedBlock)
peerCtx.markBlockAsSent(wantedBlock)
try:
for batch in wantedBlocks.toSeq.splitBatches(self.maxBlocksPerMessage):
@ -724,7 +763,7 @@ proc taskHandler*(
without blockDelivery =? await self.localLookup(wantedBlock), err:
error "Error getting block from local store",
err = err.msg, address = wantedBlock
peerCtx.removeInFlight(wantedBlock)
peerCtx.markBlockAsNotSent(wantedBlock)
continue
blockDeliveries.add(blockDelivery)
sent.incl(wantedBlock)
@ -743,7 +782,7 @@ proc taskHandler*(
finally:
# Better safe than sorry: if an exception does happen, we don't want to keep
# those in flight as it'll effectively prevent the blocks from ever being sent.
peerCtx.blocksInFlight.keepItIf(it notin wantedBlocks)
peerCtx.blocksSent.keepItIf(it notin wantedBlocks)
proc blockexcTaskRunner(self: BlockExcEngine) {.async: (raises: []).} =
## process tasks

View File

@ -34,7 +34,7 @@ declareGauge(
const
DefaultBlockRetries* = 3000
DefaultRetryInterval* = 180.seconds
DefaultRetryInterval* = 5.seconds
type
RetriesExhaustedError* = object of CatchableError

View File

@ -30,23 +30,25 @@ type BlockExcPeerCtx* = ref object of RootObj
blocks*: Table[BlockAddress, Presence] # remote peer have list including price
wantedBlocks*: HashSet[BlockAddress] # blocks that the peer wants
exchanged*: int # times peer has exchanged with us
lastExchange*: Moment # last time peer has exchanged with us
lastRefresh*: Moment # last time we refreshed our knowledge of the blocks this peer has
account*: ?Account # ethereum account of this peer
paymentChannel*: ?ChannelId # payment channel id
blocksInFlight*: HashSet[BlockAddress] # blocks in flight towards peer
blocksSent*: HashSet[BlockAddress] # blocks sent to peer
blocksRequested*: HashSet[BlockAddress] # pending block requests to this peer
lastExchange*: Moment # last time peer has sent us a block
activityTimeout*: Duration
proc isKnowledgeStale*(self: BlockExcPeerCtx): bool =
self.lastRefresh + 5.minutes < Moment.now()
proc isInFlight*(self: BlockExcPeerCtx, address: BlockAddress): bool =
address in self.blocksInFlight
proc isBlockSent*(self: BlockExcPeerCtx, address: BlockAddress): bool =
address in self.blocksSent
proc addInFlight*(self: BlockExcPeerCtx, address: BlockAddress) =
self.blocksInFlight.incl(address)
proc markBlockAsSent*(self: BlockExcPeerCtx, address: BlockAddress) =
self.blocksSent.incl(address)
proc removeInFlight*(self: BlockExcPeerCtx, address: BlockAddress) =
self.blocksInFlight.excl(address)
proc markBlockAsNotSent*(self: BlockExcPeerCtx, address: BlockAddress) =
self.blocksSent.excl(address)
proc refreshed*(self: BlockExcPeerCtx) =
self.lastRefresh = Moment.now()
@ -77,3 +79,31 @@ func price*(self: BlockExcPeerCtx, addresses: seq[BlockAddress]): UInt256 =
price += precense[].price
price
proc blockRequested*(self: BlockExcPeerCtx, address: BlockAddress) =
# We start counting the timeout from the first block requested.
if self.blocksRequested.len == 0:
self.lastExchange = Moment.now()
self.blocksRequested.incl(address)
proc blockRequestCancelled*(self: BlockExcPeerCtx, address: BlockAddress) =
self.blocksRequested.excl(address)
proc blockReceived*(self: BlockExcPeerCtx, address: BlockAddress) =
self.blocksRequested.excl(address)
self.lastExchange = Moment.now()
proc activityTimer*(
self: BlockExcPeerCtx
): Future[void] {.async: (raises: [CancelledError]).} =
## This is called by the block exchange when a block is scheduled for this peer.
## If the peer sends no blocks for a while, it is considered inactive/uncooperative
## and the peer is dropped. Note that ANY block that the peer sends will reset this
## timer for all blocks.
##
while true:
let idleTime = Moment.now() - self.lastExchange
if idleTime > self.activityTimeout:
return
await sleepAsync(self.activityTimeout - idleTime)