feat: remove quadratic joins in cancelBlocks; use SafeAsyncIterator for getBlocks; limit memory usage for fetchBatched when used as prefetcher

This commit is contained in:
gmega 2025-06-30 17:30:47 -03:00
parent d94bfe60f6
commit db279d8fa9
No known key found for this signature in database
GPG Key ID: 6290D34EAD824B18
6 changed files with 108 additions and 43 deletions

View File

@ -256,7 +256,7 @@ proc downloadInternal(
proc requestBlocks*(
self: BlockExcEngine, addresses: seq[BlockAddress]
): Future[seq[?!Block]] {.async: (raises: [CancelledError]).} =
): SafeAsyncIter[Block] =
var handles: seq[BlockHandle]
# Adds all blocks to pendingBlocks before calling the first downloadInternal. This will
# ensure that we don't send incomplete want lists.
@ -267,20 +267,27 @@ proc requestBlocks*(
for address in addresses:
self.trackedFutures.track(self.downloadInternal(address))
# TODO: we can reduce latency and improve download times
# by returning blocks out of order as futures complete.
var blocks: seq[?!Block]
for handle in handles:
try:
blocks.add(success await handle)
except CancelledError as err:
warn "Block request cancelled", addresses, err = err.msg
raise err
except CatchableError as err:
error "Error getting blocks from exchange engine", addresses, err = err.msg
blocks.add(Block.failure err)
var completed: int = 0
return blocks
proc isFinished(): bool =
completed == handles.len
proc genNext(): Future[?!Block] {.async: (raises: [CancelledError]).} =
# Be it success or failure, we're completing this future.
let value =
try:
success await handles[completed]
except CancelledError as err:
warn "Block request cancelled", addresses, err = err.msg
raise err
except CatchableError as err:
error "Error getting blocks from exchange engine", addresses, err = err.msg
failure err
inc(completed)
return value
return SafeAsyncIter[Block].new(genNext, isFinished)
proc requestBlock*(
self: BlockExcEngine, address: BlockAddress
@ -368,28 +375,42 @@ proc cancelBlocks(
## Tells neighboring peers that we're no longer interested in a block.
##
let addrSet = toHashSet(addrs)
var pendingCancellations: Table[PeerId, HashSet[BlockAddress]]
if self.peers.len == 0:
return
trace "Sending block request cancellations to peers",
addrs, peers = self.peers.peerIds
proc processPeer(peerCtx: BlockExcPeerCtx): Future[BlockExcPeerCtx] {.async.} =
proc processPeer(
entry: tuple[peerId: PeerId, addresses: HashSet[BlockAddress]]
): Future[PeerId] {.async: (raises: [CancelledError]).} =
await self.network.request.sendWantCancellations(
peer = peerCtx.id, addresses = addrs.filterIt(it in peerCtx)
peer = entry.peerId, addresses = entry.addresses.toSeq
)
return peerCtx
return entry.peerId
try:
let (succeededFuts, failedFuts) = await allFinishedFailed[BlockExcPeerCtx](
toSeq(self.peers.peers.values).filterIt(it.peerHave.anyIt(it in addrs)).map(
processPeer
)
# Does the peer have any of the blocks we're canceling?
for peerCtx in self.peers.peers.values:
let intersection = peerCtx.peerHave.intersection(addrSet)
if intersection.len > 0:
pendingCancellations[peerCtx.id] = intersection
# If so, dispatches cancellations.
# FIXME: we're still spamming peers - the fact that the peer has the block does
# not mean we've requested it.
let (succeededFuts, failedFuts) = await allFinishedFailed[PeerId](
toSeq(pendingCancellations.pairs).map(processPeer)
)
(await allFinished(succeededFuts)).mapIt(it.read).apply do(peerCtx: BlockExcPeerCtx):
peerCtx.cleanPresence(addrs)
(await allFinished(succeededFuts)).mapIt(it.read).apply do(peerId: PeerId):
let ctx = self.peers.get(peerId)
if not ctx.isNil:
ctx.cleanPresence(addrs)
if failedFuts.len > 0:
warn "Failed to send block request cancellations to peers", peers = failedFuts.len
@ -539,6 +560,8 @@ proc wantListHandler*(
price = @(self.pricing.get(Pricing(price: 0.u256)).price.toBytesBE)
if e.cancel:
# This is sort of expected if we sent the block to the peer, as we have removed
# it from the peer's wantlist ourselves.
trace "Received cancelation for untracked block, skipping",
address = e.address
continue

View File

@ -44,7 +44,7 @@ import ./indexingstrategy
import ./utils
import ./errors
import ./logutils
import ./utils/asynciter
import ./utils/safeasynciter
import ./utils/trackedfutures
export logutils
@ -194,20 +194,38 @@ proc fetchBatched*(
if not (await address in self.networkStore) or fetchLocal:
address
let
blockResults = await self.networkStore.getBlocks(addresses)
blocks = blockResults.filterIt(it.isSuccess()).mapIt(it.value)
numOfFailedBlocks = blockResults.len - blocks.len
proc successful(
blk: ?!bt.Block
): Future[bool] {.async: (raises: [CancelledError]).} =
return blk.isSuccess()
if numOfFailedBlocks > 0:
return
failure("Some blocks failed (Result) to fetch (" & $numOfFailedBlocks & ")")
let blockResults = await self.networkStore.getBlocks(addresses)
if not onBatch.isNil and batchErr =? (await onBatch(blocks)).errorOption:
var
successfulBlocks = 0
failedBlocks = 0
blockData: seq[bt.Block]
for res in blockResults:
without blk =? await res:
inc(failedBlocks)
continue
inc(successfulBlocks)
# Only retains block data in memory if there's
# a callback.
if not onBatch.isNil:
blockData.add(blk)
if failedBlocks > 0:
return failure("Some blocks failed (Result) to fetch (" & $failedBlocks & ")")
if not onBatch.isNil and batchErr =? (await onBatch(blockData)).errorOption:
return failure(batchErr)
if not iter.finished:
await sleepAsync(1.millis)
await idleAsync()
success()

View File

@ -65,6 +65,14 @@ method getBlock*(
raiseAssert("getBlock by addr not implemented!")
method getBlocks*(
self: BlockStore, addresses: seq[BlockAddress]
): Future[SafeAsyncIter[Block]] {.async: (raises: [CancelledError]).} =
## Gets a set of blocks from the blockstore. Blocks might
## be returned in any order.
raiseAssert("getBlocks not implemented!")
method getBlockAndProof*(
self: BlockStore, treeCid: Cid, index: Natural
): Future[?!(Block, CodexProof)] {.base, async: (raises: [CancelledError]), gcsafe.} =

View File

@ -31,24 +31,23 @@ type NetworkStore* = ref object of BlockStore
engine*: BlockExcEngine # blockexc decision engine
localStore*: BlockStore # local block store
proc getBlocks*(
method getBlocks*(
self: NetworkStore, addresses: seq[BlockAddress]
): Future[seq[?!Block]] {.async: (raises: [CancelledError]).} =
): Future[SafeAsyncIter[Block]] {.async: (raises: [CancelledError]).} =
var
localBlocks: seq[?!Block]
localAddresses: seq[BlockAddress]
remoteAddresses: seq[BlockAddress]
# We can resolve local blocks sequentially as for now those are blocking anyway. Still:
# TODO: implement getBlocks for local store so we can delegate it here.
for address in addresses:
if not (await address in self.localStore):
remoteAddresses.add(address)
else:
localBlocks.add(await self.localStore.getBlock(address))
localAddresses.add(address)
let remoteBlocks = await self.engine.requestBlocks(remoteAddresses)
return localBlocks.concat(remoteBlocks)
return chain(
await self.localStore.getBlocks(localAddresses),
self.engine.requestBlocks(remoteAddresses),
)
method getBlock*(
self: NetworkStore, address: BlockAddress

View File

@ -38,6 +38,21 @@ logScope:
# BlockStore API
###########################################################
method getBlocks*(
self: RepoStore, addresses: seq[BlockAddress]
): Future[SafeAsyncIter[Block]] {.async: (raises: [CancelledError]).} =
var i = 0
proc isFinished(): bool =
i == addresses.len
proc genNext(): Future[?!Block] {.async: (raises: [CancelledError]).} =
let value = await self.getBlock(addresses[i])
inc(i)
return value
return SafeAsyncIter[Block].new(genNext, isFinished)
method getBlock*(
self: RepoStore, cid: Cid
): Future[?!Block] {.async: (raises: [CancelledError]).} =

View File

@ -404,8 +404,10 @@ asyncchecksuite "Test SafeAsyncIter":
expect CancelledError:
for fut in iter2:
without i =? (await fut), err:
if i =? (await fut):
collected.add(i)
else:
fail()
check:
# We expect only values "0" and "1" to be collected