feat: cap how many blocks we can pack in a single message

This commit is contained in:
gmega 2025-06-06 12:56:56 -03:00 committed by Chrysostomos Nanakos
parent 475d31bef2
commit 1135a513d4
No known key found for this signature in database
4 changed files with 90 additions and 57 deletions

View File

@ -66,6 +66,10 @@ declareCounter(
const
DefaultMaxPeersPerRequest* = 10
# The default max message length of nim-libp2p is 100 megabytes, meaning we can
# in principle fit up to 1600 64k blocks per message, so 500 is well under
# that number.
DefaultMaxBlocksPerMessage = 500
DefaultTaskQueueSize = 100
DefaultConcurrentTasks = 10
@ -82,6 +86,8 @@ type
concurrentTasks: int # Number of concurrent peers we're serving at any given time
trackedFutures: TrackedFutures # Tracks futures of blockexc tasks
blockexcRunning: bool # Indicates if the blockexc task is running
maxBlocksPerMessage: int
# Maximum number of blocks we can squeeze in a single message
pendingBlocks*: PendingBlocksManager # Blocks we're awaiting to be resolved
wallet*: WalletRef # Nitro wallet for micropayments
pricing*: ?Pricing # Optional bandwidth pricing
@ -154,7 +160,9 @@ proc sendWantBlock(
) # we want this remote to send us a block
codex_block_exchange_want_block_lists_sent.inc()
proc refreshBlockKnowledge(self: BlockExcEngine, peer: BlockExcPeerCtx) {.async: (raises: [CancelledError]).} =
proc refreshBlockKnowledge(
self: BlockExcEngine, peer: BlockExcPeerCtx
) {.async: (raises: [CancelledError]).} =
# broadcast our want list, the other peer will do the same
if self.pendingBlocks.wantListLen > 0:
let cids = toSeq(self.pendingBlocks.wantList)
@ -214,6 +222,9 @@ proc downloadInternal(
await self.refreshBlockKnowledge()
self.discovery.queueFindBlocksReq(@[address.cidOrTreeCid])
# FIXME: blocks should not blindly reschedule themselves. Instead,
# we should only reschedule a block if the peer drops, or we are
# in endgame mode.
await (handle or sleepAsync(self.pendingBlocks.retryInterval))
self.pendingBlocks.decRetries(address)
@ -231,7 +242,9 @@ proc downloadInternal(
finally:
self.pendingBlocks.setInFlight(address, false)
proc requestBlocks*(self: BlockExcEngine, addresses: seq[BlockAddress]): Future[seq[?!Block]] {.async: (raises: [CancelledError]).} =
proc requestBlocks*(
self: BlockExcEngine, addresses: seq[BlockAddress]
): Future[seq[?!Block]] {.async: (raises: [CancelledError]).} =
var handles: seq[BlockHandle]
# Adds all blocks to pendingBlocks before calling the first downloadInternal. This will
# ensure that we don't send incomplete want lists.
@ -317,6 +330,7 @@ proc blockPresenceHandler*(
if ourWantCids.len > 0:
trace "Peer has blocks in our wantList", peer, wants = ourWantCids
# FIXME: this will result in duplicate requests for blocks
if err =? catch(await self.sendWantBlock(ourWantCids, peerCtx)).errorOption:
warn "Failed to send wantBlock to peer", peer, err = err.msg
@ -629,65 +643,73 @@ proc dropPeer*(self: BlockExcEngine, peer: PeerId) {.raises: [].} =
# drop the peer from the peers table
self.peers.remove(peer)
proc localLookup(
self: BlockExcEngine, e: WantListEntry
): Future[?!BlockDelivery] {.async: (raises: [CancelledError]).} =
if e.address.leaf:
(await self.localStore.getBlockAndProof(e.address.treeCid, e.address.index)).map(
(blkAndProof: (Block, CodexProof)) =>
BlockDelivery(
address: e.address, blk: blkAndProof[0], proof: blkAndProof[1].some
)
)
else:
(await self.localStore.getBlock(e.address)).map(
(blk: Block) => BlockDelivery(
address: e.address, blk: blk, proof: CodexProof.none
)
)
iterator splitBatches[T](sequence: seq[T], batchSize: int): seq[T] =
var batch: seq[T]
for element in sequence:
if batch.len == batchSize:
yield batch
batch = @[]
batch.add(element)
if batch.len > 0:
yield batch
proc taskHandler*(
self: BlockExcEngine, task: BlockExcPeerCtx
self: BlockExcEngine, peerCtx: BlockExcPeerCtx
) {.gcsafe, async: (raises: [CancelledError, RetriesExhaustedError]).} =
# Send to the peer blocks he wants to get,
# if they present in our local store
# TODO: There should be all sorts of accounting of
# bytes sent/received here
# Blocks that are in flight have already been picked up by other tasks and
# should not be re-sent.
var wantedBlocks = peerCtx.peerWants.filterIt(
it.wantType == WantType.WantBlock and not peerCtx.isInFlight(it.address)
)
var wantsBlocks =
task.peerWants.filterIt(it.wantType == WantType.WantBlock and not it.inFlight)
wantedBlocks.sort(SortOrder.Descending)
proc updateInFlight(addresses: seq[BlockAddress], inFlight: bool) =
for peerWant in task.peerWants.mitems:
if peerWant.address in addresses:
peerWant.inFlight = inFlight
for wantedBlock in wantedBlocks:
peerCtx.addInFlight(wantedBlock.address)
if wantsBlocks.len > 0:
# Mark wants as in-flight.
let wantAddresses = wantsBlocks.mapIt(it.address)
updateInFlight(wantAddresses, true)
wantsBlocks.sort(SortOrder.Descending)
try:
for batch in wantedBlocks.splitBatches(self.maxBlocksPerMessage):
var blockDeliveries: seq[BlockDelivery]
for wantedBlock in batch:
# I/O is blocking so looking up blocks sequentially is fine.
without blockDelivery =? await self.localLookup(wantedBlock), err:
error "Error getting block from local store",
err = err.msg, address = wantedBlock.address
peerCtx.removeInFlight(wantedBlock.address)
continue
blockDeliveries.add(blockDelivery)
proc localLookup(e: WantListEntry): Future[?!BlockDelivery] {.async.} =
if e.address.leaf:
(await self.localStore.getBlockAndProof(e.address.treeCid, e.address.index)).map(
(blkAndProof: (Block, CodexProof)) =>
BlockDelivery(
address: e.address, blk: blkAndProof[0], proof: blkAndProof[1].some
)
)
else:
(await self.localStore.getBlock(e.address)).map(
(blk: Block) =>
BlockDelivery(address: e.address, blk: blk, proof: CodexProof.none)
)
let
blocksDeliveryFut = await allFinished(wantsBlocks.map(localLookup))
blocksDelivery = blocksDeliveryFut.filterIt(it.completed and it.value.isOk).mapIt:
if bd =? it.value:
bd
else:
raiseAssert "Unexpected error in local lookup"
# All the wants that failed local lookup must be set to not-in-flight again.
let
successAddresses = blocksDelivery.mapIt(it.address)
failedAddresses = wantAddresses.filterIt(it notin successAddresses)
updateInFlight(failedAddresses, false)
if blocksDelivery.len > 0:
trace "Sending blocks to peer",
peer = task.id, blocks = (blocksDelivery.mapIt(it.address))
await self.network.request.sendBlocksDelivery(task.id, blocksDelivery)
codex_block_exchange_blocks_sent.inc(blocksDelivery.len.int64)
task.peerWants.keepItIf(it.address notin successAddresses)
await self.network.request.sendBlocksDelivery(peerCtx.id, blockDeliveries)
codex_block_exchange_blocks_sent.inc(blockDeliveries.len.int64)
# Drops the batch from want list. Note that the send might still fail down the line
# and we will have removed them anyway, at which point we rely on the requester
# performing a retry for the request to succeed.
peerCtx.peerWants.keepItIf(it.address notin blockDeliveries.mapIt(it.address))
finally:
# Better safe than sorry: if an exception does happen, we don't want to keep
# those in flight as it'll effectively prevent the blocks from ever being sent.
peerCtx.blocksInFlight.keepItIf(it notin wantedBlocks.mapIt(it.address))
proc blockexcTaskRunner(self: BlockExcEngine) {.async: (raises: []).} =
## process tasks
@ -712,6 +734,7 @@ proc new*(
advertiser: Advertiser,
peerStore: PeerCtxStore,
pendingBlocks: PendingBlocksManager,
maxBlocksPerMessage = DefaultMaxBlocksPerMessage,
concurrentTasks = DefaultConcurrentTasks,
): BlockExcEngine =
## Create new block exchange engine instance
@ -725,6 +748,7 @@ proc new*(
wallet: wallet,
concurrentTasks: concurrentTasks,
trackedFutures: TrackedFutures(),
maxBlocksPerMessage: maxBlocksPerMessage,
taskQueue: newAsyncHeapQueue[BlockExcPeerCtx](DefaultTaskQueueSize),
discovery: discovery,
advertiser: advertiser,

View File

@ -34,7 +34,7 @@ declareGauge(
const
DefaultBlockRetries* = 3000
DefaultRetryInterval* = 10.seconds
DefaultRetryInterval* = 180.seconds
type
RetriesExhaustedError* = object of CatchableError
@ -124,9 +124,6 @@ proc resolve*(
blockReq.handle.complete(bd.blk)
codex_block_exchange_retrieval_time_us.set(retrievalDurationUs)
if retrievalDurationUs > 500000:
warn "High block retrieval time", retrievalDurationUs, address = bd.address
else:
trace "Block handle already finished", address = bd.address

View File

@ -34,10 +34,23 @@ type BlockExcPeerCtx* = ref object of RootObj
lastRefresh*: Moment # last time we refreshed our knowledge of the blocks this peer has
account*: ?Account # ethereum account of this peer
paymentChannel*: ?ChannelId # payment channel id
blocksInFlight*: seq[BlockAddress] # blocks in flight towards peer
proc isKnowledgeStale*(self: BlockExcPeerCtx): bool =
self.lastRefresh + 15.seconds < Moment.now()
proc isInFlight*(self: BlockExcPeerCtx, address: BlockAddress): bool =
address in self.blocksInFlight
proc addInFlight*(self: BlockExcPeerCtx, address: BlockAddress) =
if not self.isInFlight(address):
self.blocksInFlight.add(address)
proc removeInFlight*(self: BlockExcPeerCtx, address: BlockAddress) =
let index = self.blocksInFlight.find(address)
if index != -1:
self.blocksInFlight.delete(index)
proc refreshed*(self: BlockExcPeerCtx) =
self.lastRefresh = Moment.now()

View File

@ -29,7 +29,6 @@ type
cancel*: bool # Whether this revokes an entry
wantType*: WantType # Note: defaults to enum 0, ie Block
sendDontHave*: bool # Note: defaults to false
inFlight*: bool # Whether block sending is in progress. Not serialized.
WantList* = object
entries*: seq[WantListEntry] # A list of wantList entries