feat: add dataset request batching

This commit is contained in:
gmega 2025-06-05 15:30:08 -03:00 committed by Chrysostomos Nanakos
parent f791a960f2
commit 475d31bef2
No known key found for this signature in database
6 changed files with 95 additions and 22 deletions

View File

@ -78,6 +78,8 @@ proc discoveryTaskLoop(b: DiscoveryEngine) {.async: (raises: []).} =
trace "Discovery request already in progress", cid
continue
trace "Running discovery task for cid", cid
let haves = b.peers.peersHave(cid)
if haves.len < b.minPeersPerBlock:

View File

@ -154,6 +154,28 @@ proc sendWantBlock(
) # we want this remote to send us a block
codex_block_exchange_want_block_lists_sent.inc()
proc refreshBlockKnowledge(self: BlockExcEngine, peer: BlockExcPeerCtx) {.async: (raises: [CancelledError]).} =
# broadcast our want list, the other peer will do the same
if self.pendingBlocks.wantListLen > 0:
let cids = toSeq(self.pendingBlocks.wantList)
trace "Sending our want list to a peer", peer = peer.id, length = cids.len
await self.network.request.sendWantList(peer.id, cids, full = true)
proc refreshBlockKnowledge(self: BlockExcEngine) {.async: (raises: [CancelledError]).} =
for peer in self.peers.peers.values:
# We refresh block knowledge if:
# 1. the peer hasn't been refreshed in a while;
# 2. the list of blocks we care about has actually changed.
#
# Note that because of (2), it is important that we update our
# want list in the coarsest way possible instead of over many
# small updates.
#
# In dynamic swarms, staleness will dominate latency.
if peer.lastRefresh < self.pendingBlocks.lastInclusion or peer.isKnowledgeStale:
await self.refreshBlockKnowledge(peer)
peer.refreshed()
proc randomPeer(peers: seq[BlockExcPeerCtx]): BlockExcPeerCtx =
Rng.instance.sample(peers)
@ -189,7 +211,7 @@ proc downloadInternal(
else:
self.pendingBlocks.setInFlight(address, false)
if peers.without.len > 0:
await self.sendWantHave(@[address], peers.without)
await self.refreshBlockKnowledge()
self.discovery.queueFindBlocksReq(@[address.cidOrTreeCid])
await (handle or sleepAsync(self.pendingBlocks.retryInterval))
@ -209,6 +231,32 @@ proc downloadInternal(
finally:
self.pendingBlocks.setInFlight(address, false)
proc requestBlocks*(self: BlockExcEngine, addresses: seq[BlockAddress]): Future[seq[?!Block]] {.async: (raises: [CancelledError]).} =
var handles: seq[BlockHandle]
# Adds all blocks to pendingBlocks before calling the first downloadInternal. This will
# ensure that we don't send incomplete want lists.
for address in addresses:
if address notin self.pendingBlocks:
handles.add(self.pendingBlocks.getWantHandle(address))
for address in addresses:
self.trackedFutures.track(self.downloadInternal(address))
# TODO: we can reduce latency and improve download times
# by returning blocks out of order as futures complete.
var blocks: seq[?!Block]
for handle in handles:
try:
blocks.add(success await handle)
except CancelledError as err:
warn "Block request cancelled", addresses, err = err.msg
raise err
except CatchableError as err:
error "Error getting blocks from exchange engine", addresses, err = err.msg
blocks.add(Block.failure err)
return blocks
proc requestBlock*(
self: BlockExcEngine, address: BlockAddress
): Future[?!Block] {.async: (raises: [CancelledError]).} =
@ -239,7 +287,7 @@ proc completeBlock*(self: BlockExcEngine, address: BlockAddress, blk: Block) =
proc blockPresenceHandler*(
self: BlockExcEngine, peer: PeerId, blocks: seq[BlockPresence]
) {.async: (raises: []).} =
trace "Received block presence from peer", peer, blocks = blocks.mapIt($it)
trace "Received block presence from peer", peer, len = blocks.len
let
peerCtx = self.peers.get(peer)
ourWantList = toSeq(self.pendingBlocks.wantList)
@ -482,12 +530,14 @@ proc wantListHandler*(
case e.wantType
of WantType.WantHave:
if have:
trace "We HAVE the block", address = e.address
presence.add(
BlockPresence(
address: e.address, `type`: BlockPresenceType.Have, price: price
)
)
else:
trace "We DON'T HAVE the block", address = e.address
if e.sendDontHave:
presence.add(
BlockPresence(
@ -560,15 +610,11 @@ proc setupPeer*(
trace "Setting up peer", peer
if peer notin self.peers:
let peerCtx = BlockExcPeerCtx(id: peer)
trace "Setting up new peer", peer
self.peers.add(BlockExcPeerCtx(id: peer))
self.peers.add(peerCtx)
trace "Added peer", peers = self.peers.len
# broadcast our want list, the other peer will do the same
if self.pendingBlocks.wantListLen > 0:
trace "Sending our want list to a peer", peer
let cids = toSeq(self.pendingBlocks.wantList)
await self.network.request.sendWantList(peer, cids, full = true)
await self.refreshBlockKnowledge(peerCtx)
if address =? self.pricing .? address:
trace "Sending account to peer", peer

View File

@ -34,7 +34,7 @@ declareGauge(
const
DefaultBlockRetries* = 3000
DefaultRetryInterval* = 500.millis
DefaultRetryInterval* = 10.seconds
type
RetriesExhaustedError* = object of CatchableError
@ -50,6 +50,7 @@ type
blockRetries*: int = DefaultBlockRetries
retryInterval*: Duration = DefaultRetryInterval
blocks*: Table[BlockAddress, BlockReq] # pending Block requests
lastInclusion*: Moment # time at which we last included a block into our wantlist
proc updatePendingBlockGauge(p: PendingBlocksManager) =
codex_block_exchange_pending_block_requests.set(p.blocks.len.int64)
@ -70,6 +71,8 @@ proc getWantHandle*(
startTime: getMonoTime().ticks,
)
self.blocks[address] = blk
self.lastInclusion = Moment.now()
let handle = blk.handle
proc cleanUpBlock(data: pointer) {.raises: [].} =

View File

@ -31,9 +31,16 @@ type BlockExcPeerCtx* = ref object of RootObj
peerWants*: seq[WantListEntry] # remote peers want lists
exchanged*: int # times peer has exchanged with us
lastExchange*: Moment # last time peer has exchanged with us
lastRefresh*: Moment # last time we refreshed our knowledge of the blocks this peer has
account*: ?Account # ethereum account of this peer
paymentChannel*: ?ChannelId # payment channel id
proc isKnowledgeStale*(self: BlockExcPeerCtx): bool =
self.lastRefresh + 15.seconds < Moment.now()
proc refreshed*(self: BlockExcPeerCtx) =
self.lastRefresh = Moment.now()
proc peerHave*(self: BlockExcPeerCtx): seq[BlockAddress] =
toSeq(self.blocks.keys)

View File

@ -52,7 +52,7 @@ export logutils
logScope:
topics = "codex node"
const DefaultFetchBatch = 10
const DefaultFetchBatch = 1_000_000
type
Contracts* =
@ -187,23 +187,18 @@ proc fetchBatched*(
# )
while not iter.finished:
let blockFutures = collect:
let addresses = collect:
for i in 0 ..< batchSize:
if not iter.finished:
let address = BlockAddress.init(cid, iter.next())
if not (await address in self.networkStore) or fetchLocal:
self.networkStore.getBlock(address)
address
if blockFutures.len == 0:
continue
let
blockResults = await self.networkStore.getBlocks(addresses)
blocks = blockResults.filterIt(it.isSuccess()).mapIt(it.value)
numOfFailedBlocks = blockResults.len - blocks.len
without blockResults =? await allFinishedValues[?!bt.Block](blockFutures), err:
trace "Some blocks failed to fetch", err = err.msg
return failure(err)
let blocks = blockResults.filterIt(it.isSuccess()).mapIt(it.value)
let numOfFailedBlocks = blockResults.len - blocks.len
if numOfFailedBlocks > 0:
return
failure("Some blocks failed (Result) to fetch (" & $numOfFailedBlocks & ")")

View File

@ -31,6 +31,26 @@ type NetworkStore* = ref object of BlockStore
engine*: BlockExcEngine # blockexc decision engine
localStore*: BlockStore # local block store
proc getBlocks*(
self: NetworkStore,
addresses: seq[BlockAddress]
): Future[seq[?!Block]] {.async: (raises: [CancelledError]).} =
var
localBlocks: seq[?!Block]
remoteAddresses: seq[BlockAddress]
# We can resolve local blocks sequentially as for now those are blocking anyway. Still:
# TODO: implement getBlocks for local store so we can delegate it here.
for address in addresses:
if not (await address in self.localStore):
remoteAddresses.add(address)
else:
localBlocks.add(await self.localStore.getBlock(address))
let remoteBlocks = await self.engine.requestBlocks(remoteAddresses)
return localBlocks.concat(remoteBlocks)
method getBlock*(
self: NetworkStore, address: BlockAddress
): Future[?!Block] {.async: (raises: [CancelledError]).} =