optimize remaining list joins so they're not quadratic

This commit is contained in:
gmega 2025-06-09 11:26:16 -03:00 committed by Chrysostomos Nanakos
parent 096eb118f9
commit 3c43d57497
No known key found for this signature in database
4 changed files with 51 additions and 58 deletions

View File

@ -314,7 +314,7 @@ proc blockPresenceHandler*(
peerCtx.setPresence(presence)
let
peerHave = peerCtx.peerHave.toHashSet
peerHave = peerCtx.peerHave
dontWantCids = peerHave - ourWantList
if dontWantCids.len > 0:
@ -338,24 +338,23 @@ proc blockPresenceHandler*(
proc scheduleTasks(
self: BlockExcEngine, blocksDelivery: seq[BlockDelivery]
) {.async: (raises: [CancelledError]).} =
let cids = blocksDelivery.mapIt(it.blk.cid)
# schedule any new peers to provide blocks to
for p in self.peers:
for c in cids: # for each cid
for blockDelivery in blocksDelivery: # for each cid
# schedule a peer if it wants at least one cid
# and we have it in our local store
if c in p.peerWantsCids:
if blockDelivery.address in p.wantedBlocks:
let cid = blockDelivery.blk.cid
try:
if await (c in self.localStore):
if await (cid in self.localStore):
# TODO: the try/except should go away once blockstore tracks exceptions
self.scheduleTask(p)
break
except CancelledError as exc:
warn "Checking local store canceled", cid = c, err = exc.msg
warn "Checking local store canceled", cid = cid, err = exc.msg
return
except CatchableError as exc:
error "Error checking local store for cid", cid = c, err = exc.msg
error "Error checking local store for cid", cid = cid, err = exc.msg
raiseAssert "Unexpected error checking local store for cid"
proc cancelBlocks(
@ -519,14 +518,12 @@ proc wantListHandler*(
try:
for e in wantList.entries:
let idx = peerCtx.peerWants.findIt(it.address == e.address)
logScope:
peer = peerCtx.id
address = e.address
wantType = $e.wantType
if idx < 0: # Adding new entry to peer wants
if e.address notin peerCtx.wantedBlocks: # Adding new entry to peer wants
let
have =
try:
@ -562,25 +559,20 @@ proc wantListHandler*(
codex_block_exchange_want_have_lists_received.inc()
of WantType.WantBlock:
peerCtx.peerWants.add(e)
peerCtx.wantedBlocks.incl(e.address)
schedulePeer = true
codex_block_exchange_want_block_lists_received.inc()
else: # Updating existing entry in peer wants
# peer doesn't want this block anymore
if e.cancel:
trace "Canceling want for block", address = e.address
peerCtx.peerWants.del(idx)
peerCtx.wantedBlocks.excl(e.address)
trace "Canceled block request",
address = e.address, len = peerCtx.peerWants.len
address = e.address, len = peerCtx.wantedBlocks.len
else:
trace "Peer has requested a block more than once", address = e.address
if e.wantType == WantType.WantBlock:
schedulePeer = true
# peer might want to ask for the same cid with
# different want params
trace "Updating want for block", address = e.address
peerCtx.peerWants[idx] = e # update entry
trace "Updated block request",
address = e.address, len = peerCtx.peerWants.len
if presence.len > 0:
trace "Sending presence to remote", items = presence.mapIt($it).join(",")
@ -645,20 +637,16 @@ proc dropPeer*(self: BlockExcEngine, peer: PeerId) {.raises: [].} =
self.peers.remove(peer)
proc localLookup(
self: BlockExcEngine, e: WantListEntry
self: BlockExcEngine, address: BlockAddress
): Future[?!BlockDelivery] {.async: (raises: [CancelledError]).} =
if e.address.leaf:
(await self.localStore.getBlockAndProof(e.address.treeCid, e.address.index)).map(
if address.leaf:
(await self.localStore.getBlockAndProof(address.treeCid, address.index)).map(
(blkAndProof: (Block, CodexProof)) =>
BlockDelivery(
address: e.address, blk: blkAndProof[0], proof: blkAndProof[1].some
)
BlockDelivery(address: address, blk: blkAndProof[0], proof: blkAndProof[1].some)
)
else:
(await self.localStore.getBlock(e.address)).map(
(blk: Block) => BlockDelivery(
address: e.address, blk: blk, proof: CodexProof.none
)
(await self.localStore.getBlock(address)).map(
(blk: Block) => BlockDelivery(address: address, blk: blk, proof: CodexProof.none)
)
iterator splitBatches[T](sequence: seq[T], batchSize: int): seq[T] =
@ -680,40 +668,41 @@ proc taskHandler*(
# Blocks that are in flight have already been picked up by other tasks and
# should not be re-sent.
var wantedBlocks = peerCtx.peerWants.filterIt(
it.wantType == WantType.WantBlock and not peerCtx.isInFlight(it.address)
)
wantedBlocks.sort(SortOrder.Descending)
var
wantedBlocks = peerCtx.wantedBlocks.filterIt(not peerCtx.isInFlight(it))
sent: HashSet[BlockAddress]
for wantedBlock in wantedBlocks:
peerCtx.addInFlight(wantedBlock.address)
peerCtx.addInFlight(wantedBlock)
try:
for batch in wantedBlocks.splitBatches(self.maxBlocksPerMessage):
for batch in wantedBlocks.toSeq.splitBatches(self.maxBlocksPerMessage):
var blockDeliveries: seq[BlockDelivery]
for wantedBlock in batch:
# I/O is blocking so looking up blocks sequentially is fine.
without blockDelivery =? await self.localLookup(wantedBlock), err:
error "Error getting block from local store",
err = err.msg, address = wantedBlock.address
peerCtx.removeInFlight(wantedBlock.address)
err = err.msg, address = wantedBlock
peerCtx.removeInFlight(wantedBlock)
continue
blockDeliveries.add(blockDelivery)
sent.incl(wantedBlock)
if blockDeliveries.len == 0:
continue
await self.network.request.sendBlocksDelivery(peerCtx.id, blockDeliveries)
codex_block_exchange_blocks_sent.inc(blockDeliveries.len.int64)
# Drops the batch from want list. Note that the send might still fail down the line
# and we will have removed them anyway, at which point we rely on the requester
# performing a retry for the request to succeed.
peerCtx.peerWants.keepItIf(it.address notin blockDeliveries.mapIt(it.address))
# Drops the batch from the peer's set of wanted blocks; i.e. assumes that after
# we send the blocks, then the peer no longer wants them, so we don't need to
# re-send them. Note that the send might still fail down the line and we will
# have removed those anyway. At that point, we rely on the requester performing
# a retry for the request to succeed.
peerCtx.wantedBlocks.keepItIf(it notin sent)
finally:
# Better safe than sorry: if an exception does happen, we don't want to keep
# those in flight as it'll effectively prevent the blocks from ever being sent.
peerCtx.blocksInFlight.keepItIf(it notin wantedBlocks.mapIt(it.address))
peerCtx.blocksInFlight.keepItIf(it notin wantedBlocks)
proc blockexcTaskRunner(self: BlockExcEngine) {.async: (raises: []).} =
## process tasks

View File

@ -28,7 +28,7 @@ export payments, nitro
type BlockExcPeerCtx* = ref object of RootObj
id*: PeerId
blocks*: Table[BlockAddress, Presence] # remote peer have list including price
peerWants*: seq[WantListEntry] # remote peers want lists
wantedBlocks*: HashSet[BlockAddress] # blocks that the peer wants
exchanged*: int # times peer has exchanged with us
lastExchange*: Moment # last time peer has exchanged with us
lastRefresh*: Moment # last time we refreshed our knowledge of the blocks this peer has
@ -37,7 +37,7 @@ type BlockExcPeerCtx* = ref object of RootObj
blocksInFlight*: HashSet[BlockAddress] # blocks in flight towards peer
proc isKnowledgeStale*(self: BlockExcPeerCtx): bool =
self.lastRefresh + 15.seconds < Moment.now()
self.lastRefresh + 5.minutes < Moment.now()
proc isInFlight*(self: BlockExcPeerCtx, address: BlockAddress): bool =
address in self.blocksInFlight
@ -51,14 +51,11 @@ proc removeInFlight*(self: BlockExcPeerCtx, address: BlockAddress) =
proc refreshed*(self: BlockExcPeerCtx) =
self.lastRefresh = Moment.now()
proc peerHave*(self: BlockExcPeerCtx): seq[BlockAddress] =
toSeq(self.blocks.keys)
proc peerHaveCids*(self: BlockExcPeerCtx): HashSet[Cid] =
self.blocks.keys.toSeq.mapIt(it.cidOrTreeCid).toHashSet
proc peerWantsCids*(self: BlockExcPeerCtx): HashSet[Cid] =
self.peerWants.mapIt(it.address.cidOrTreeCid).toHashSet
proc peerHave*(self: BlockExcPeerCtx): HashSet[BlockAddress] =
# XXX: this is ugly an inefficient, but since those will typically
# be used in "joins", it's better to pay the price here and have
# a linear join than to not do it and have a quadratic join.
toHashSet(self.blocks.keys.toSeq)
proc contains*(self: BlockExcPeerCtx, address: BlockAddress): bool =
address in self.blocks

View File

@ -62,21 +62,23 @@ func len*(self: PeerCtxStore): int =
self.peers.len
func peersHave*(self: PeerCtxStore, address: BlockAddress): seq[BlockExcPeerCtx] =
toSeq(self.peers.values).filterIt(it.peerHave.anyIt(it == address))
toSeq(self.peers.values).filterIt(address in it.peerHave)
func peersHave*(self: PeerCtxStore, cid: Cid): seq[BlockExcPeerCtx] =
# FIXME: this is way slower and can end up leading to unexpected performance loss.
toSeq(self.peers.values).filterIt(it.peerHave.anyIt(it.cidOrTreeCid == cid))
func peersWant*(self: PeerCtxStore, address: BlockAddress): seq[BlockExcPeerCtx] =
toSeq(self.peers.values).filterIt(it.peerWants.anyIt(it == address))
toSeq(self.peers.values).filterIt(address in it.wantedBlocks)
func peersWant*(self: PeerCtxStore, cid: Cid): seq[BlockExcPeerCtx] =
toSeq(self.peers.values).filterIt(it.peerWants.anyIt(it.address.cidOrTreeCid == cid))
# FIXME: this is way slower and can end up leading to unexpected performance loss.
toSeq(self.peers.values).filterIt(it.wantedBlocks.anyIt(it.cidOrTreeCid == cid))
proc getPeersForBlock*(self: PeerCtxStore, address: BlockAddress): PeersForBlock =
var res: PeersForBlock = (@[], @[])
for peer in self:
if peer.peerHave.anyIt(it == address):
if address in peer.peerHave:
res.with.add(peer)
else:
res.without.add(peer)

View File

@ -25,6 +25,11 @@ type
WantListEntry* = object
address*: BlockAddress
# XXX: I think explicit priority is pointless as the peer will request
# the blocks in the order it wants to receive them, and all we have to
# do is process those in the same order as we send them back. It also
# complicates things for no reason at the moment, as the priority is
# always set to 0.
priority*: int32 # The priority (normalized). default to 1
cancel*: bool # Whether this revokes an entry
wantType*: WantType # Note: defaults to enum 0, ie Block