From e18996eed110ff49fde112554060fca2c08f4a72 Mon Sep 17 00:00:00 2001 From: gmega Date: Mon, 9 Jun 2025 11:26:16 -0300 Subject: [PATCH] optimize remaining list joins so they're not quadratic --- codex/blockexchange/engine/engine.nim | 77 ++++++++++------------ codex/blockexchange/peers/peercontext.nim | 17 ++--- codex/blockexchange/peers/peerctxstore.nim | 10 +-- codex/blockexchange/protobuf/message.nim | 5 ++ 4 files changed, 51 insertions(+), 58 deletions(-) diff --git a/codex/blockexchange/engine/engine.nim b/codex/blockexchange/engine/engine.nim index 80986e70..25eabb71 100644 --- a/codex/blockexchange/engine/engine.nim +++ b/codex/blockexchange/engine/engine.nim @@ -308,7 +308,7 @@ proc blockPresenceHandler*( peerCtx.setPresence(presence) let - peerHave = peerCtx.peerHave.toHashSet + peerHave = peerCtx.peerHave dontWantCids = peerHave - ourWantList if dontWantCids.len > 0: @@ -332,24 +332,23 @@ proc blockPresenceHandler*( proc scheduleTasks( self: BlockExcEngine, blocksDelivery: seq[BlockDelivery] ) {.async: (raises: [CancelledError]).} = - let cids = blocksDelivery.mapIt(it.blk.cid) - # schedule any new peers to provide blocks to for p in self.peers: - for c in cids: # for each cid + for blockDelivery in blocksDelivery: # for each cid # schedule a peer if it wants at least one cid # and we have it in our local store - if c in p.peerWantsCids: + if blockDelivery.address in p.wantedBlocks: + let cid = blockDelivery.blk.cid try: - if await (c in self.localStore): + if await (cid in self.localStore): # TODO: the try/except should go away once blockstore tracks exceptions self.scheduleTask(p) break except CancelledError as exc: - warn "Checking local store canceled", cid = c, err = exc.msg + warn "Checking local store canceled", cid = cid, err = exc.msg return except CatchableError as exc: - error "Error checking local store for cid", cid = c, err = exc.msg + error "Error checking local store for cid", cid = cid, err = exc.msg raiseAssert "Unexpected error checking local store for cid" proc cancelBlocks( @@ -513,14 +512,12 @@ proc wantListHandler*( try: for e in wantList.entries: - let idx = peerCtx.peerWants.findIt(it.address == e.address) - logScope: peer = peerCtx.id address = e.address wantType = $e.wantType - if idx < 0: # Adding new entry to peer wants + if e.address notin peerCtx.wantedBlocks: # Adding new entry to peer wants let have = try: @@ -556,25 +553,20 @@ proc wantListHandler*( codex_block_exchange_want_have_lists_received.inc() of WantType.WantBlock: - peerCtx.peerWants.add(e) + peerCtx.wantedBlocks.incl(e.address) schedulePeer = true codex_block_exchange_want_block_lists_received.inc() else: # Updating existing entry in peer wants # peer doesn't want this block anymore if e.cancel: trace "Canceling want for block", address = e.address - peerCtx.peerWants.del(idx) + peerCtx.wantedBlocks.excl(e.address) trace "Canceled block request", - address = e.address, len = peerCtx.peerWants.len + address = e.address, len = peerCtx.wantedBlocks.len else: + trace "Peer has requested a block more than once", address = e.address if e.wantType == WantType.WantBlock: schedulePeer = true - # peer might want to ask for the same cid with - # different want params - trace "Updating want for block", address = e.address - peerCtx.peerWants[idx] = e # update entry - trace "Updated block request", - address = e.address, len = peerCtx.peerWants.len if presence.len > 0: trace "Sending presence to remote", items = presence.mapIt($it).join(",") @@ -639,20 +631,16 @@ proc dropPeer*(self: BlockExcEngine, peer: PeerId) {.raises: [].} = self.peers.remove(peer) proc localLookup( - self: BlockExcEngine, e: WantListEntry + self: BlockExcEngine, address: BlockAddress ): Future[?!BlockDelivery] {.async: (raises: [CancelledError]).} = - if e.address.leaf: - (await self.localStore.getBlockAndProof(e.address.treeCid, e.address.index)).map( + if address.leaf: + (await self.localStore.getBlockAndProof(address.treeCid, address.index)).map( (blkAndProof: (Block, CodexProof)) => - BlockDelivery( - address: e.address, blk: blkAndProof[0], proof: blkAndProof[1].some - ) + BlockDelivery(address: address, blk: blkAndProof[0], proof: blkAndProof[1].some) ) else: - (await self.localStore.getBlock(e.address)).map( - (blk: Block) => BlockDelivery( - address: e.address, blk: blk, proof: CodexProof.none - ) + (await self.localStore.getBlock(address)).map( + (blk: Block) => BlockDelivery(address: address, blk: blk, proof: CodexProof.none) ) iterator splitBatches[T](sequence: seq[T], batchSize: int): seq[T] = @@ -674,40 +662,41 @@ proc taskHandler*( # Blocks that are in flight have already been picked up by other tasks and # should not be re-sent. - var wantedBlocks = peerCtx.peerWants.filterIt( - it.wantType == WantType.WantBlock and not peerCtx.isInFlight(it.address) - ) - - wantedBlocks.sort(SortOrder.Descending) + var + wantedBlocks = peerCtx.wantedBlocks.filterIt(not peerCtx.isInFlight(it)) + sent: HashSet[BlockAddress] for wantedBlock in wantedBlocks: - peerCtx.addInFlight(wantedBlock.address) + peerCtx.addInFlight(wantedBlock) try: - for batch in wantedBlocks.splitBatches(self.maxBlocksPerMessage): + for batch in wantedBlocks.toSeq.splitBatches(self.maxBlocksPerMessage): var blockDeliveries: seq[BlockDelivery] for wantedBlock in batch: # I/O is blocking so looking up blocks sequentially is fine. without blockDelivery =? await self.localLookup(wantedBlock), err: error "Error getting block from local store", - err = err.msg, address = wantedBlock.address - peerCtx.removeInFlight(wantedBlock.address) + err = err.msg, address = wantedBlock + peerCtx.removeInFlight(wantedBlock) continue blockDeliveries.add(blockDelivery) + sent.incl(wantedBlock) if blockDeliveries.len == 0: continue await self.network.request.sendBlocksDelivery(peerCtx.id, blockDeliveries) codex_block_exchange_blocks_sent.inc(blockDeliveries.len.int64) - # Drops the batch from want list. Note that the send might still fail down the line - # and we will have removed them anyway, at which point we rely on the requester - # performing a retry for the request to succeed. - peerCtx.peerWants.keepItIf(it.address notin blockDeliveries.mapIt(it.address)) + # Drops the batch from the peer's set of wanted blocks; i.e. assumes that after + # we send the blocks, then the peer no longer wants them, so we don't need to + # re-send them. Note that the send might still fail down the line and we will + # have removed those anyway. At that point, we rely on the requester performing + # a retry for the request to succeed. + peerCtx.wantedBlocks.keepItIf(it notin sent) finally: # Better safe than sorry: if an exception does happen, we don't want to keep # those in flight as it'll effectively prevent the blocks from ever being sent. - peerCtx.blocksInFlight.keepItIf(it notin wantedBlocks.mapIt(it.address)) + peerCtx.blocksInFlight.keepItIf(it notin wantedBlocks) proc blockexcTaskRunner(self: BlockExcEngine) {.async: (raises: []).} = ## process tasks diff --git a/codex/blockexchange/peers/peercontext.nim b/codex/blockexchange/peers/peercontext.nim index efac8b43..857a8fa9 100644 --- a/codex/blockexchange/peers/peercontext.nim +++ b/codex/blockexchange/peers/peercontext.nim @@ -28,7 +28,7 @@ export payments, nitro type BlockExcPeerCtx* = ref object of RootObj id*: PeerId blocks*: Table[BlockAddress, Presence] # remote peer have list including price - peerWants*: seq[WantListEntry] # remote peers want lists + wantedBlocks*: HashSet[BlockAddress] # blocks that the peer wants exchanged*: int # times peer has exchanged with us lastExchange*: Moment # last time peer has exchanged with us lastRefresh*: Moment # last time we refreshed our knowledge of the blocks this peer has @@ -37,7 +37,7 @@ type BlockExcPeerCtx* = ref object of RootObj blocksInFlight*: HashSet[BlockAddress] # blocks in flight towards peer proc isKnowledgeStale*(self: BlockExcPeerCtx): bool = - self.lastRefresh + 15.seconds < Moment.now() + self.lastRefresh + 5.minutes < Moment.now() proc isInFlight*(self: BlockExcPeerCtx, address: BlockAddress): bool = address in self.blocksInFlight @@ -51,14 +51,11 @@ proc removeInFlight*(self: BlockExcPeerCtx, address: BlockAddress) = proc refreshed*(self: BlockExcPeerCtx) = self.lastRefresh = Moment.now() -proc peerHave*(self: BlockExcPeerCtx): seq[BlockAddress] = - toSeq(self.blocks.keys) - -proc peerHaveCids*(self: BlockExcPeerCtx): HashSet[Cid] = - self.blocks.keys.toSeq.mapIt(it.cidOrTreeCid).toHashSet - -proc peerWantsCids*(self: BlockExcPeerCtx): HashSet[Cid] = - self.peerWants.mapIt(it.address.cidOrTreeCid).toHashSet +proc peerHave*(self: BlockExcPeerCtx): HashSet[BlockAddress] = + # XXX: this is ugly an inefficient, but since those will typically + # be used in "joins", it's better to pay the price here and have + # a linear join than to not do it and have a quadratic join. + toHashSet(self.blocks.keys.toSeq) proc contains*(self: BlockExcPeerCtx, address: BlockAddress): bool = address in self.blocks diff --git a/codex/blockexchange/peers/peerctxstore.nim b/codex/blockexchange/peers/peerctxstore.nim index ce2506a8..171206ba 100644 --- a/codex/blockexchange/peers/peerctxstore.nim +++ b/codex/blockexchange/peers/peerctxstore.nim @@ -62,21 +62,23 @@ func len*(self: PeerCtxStore): int = self.peers.len func peersHave*(self: PeerCtxStore, address: BlockAddress): seq[BlockExcPeerCtx] = - toSeq(self.peers.values).filterIt(it.peerHave.anyIt(it == address)) + toSeq(self.peers.values).filterIt(address in it.peerHave) func peersHave*(self: PeerCtxStore, cid: Cid): seq[BlockExcPeerCtx] = + # FIXME: this is way slower and can end up leading to unexpected performance loss. toSeq(self.peers.values).filterIt(it.peerHave.anyIt(it.cidOrTreeCid == cid)) func peersWant*(self: PeerCtxStore, address: BlockAddress): seq[BlockExcPeerCtx] = - toSeq(self.peers.values).filterIt(it.peerWants.anyIt(it == address)) + toSeq(self.peers.values).filterIt(address in it.wantedBlocks) func peersWant*(self: PeerCtxStore, cid: Cid): seq[BlockExcPeerCtx] = - toSeq(self.peers.values).filterIt(it.peerWants.anyIt(it.address.cidOrTreeCid == cid)) + # FIXME: this is way slower and can end up leading to unexpected performance loss. + toSeq(self.peers.values).filterIt(it.wantedBlocks.anyIt(it.cidOrTreeCid == cid)) proc getPeersForBlock*(self: PeerCtxStore, address: BlockAddress): PeersForBlock = var res: PeersForBlock = (@[], @[]) for peer in self: - if peer.peerHave.anyIt(it == address): + if address in peer.peerHave: res.with.add(peer) else: res.without.add(peer) diff --git a/codex/blockexchange/protobuf/message.nim b/codex/blockexchange/protobuf/message.nim index a3384416..00dbc57b 100644 --- a/codex/blockexchange/protobuf/message.nim +++ b/codex/blockexchange/protobuf/message.nim @@ -25,6 +25,11 @@ type WantListEntry* = object address*: BlockAddress + # XXX: I think explicit priority is pointless as the peer will request + # the blocks in the order it wants to receive them, and all we have to + # do is process those in the same order as we send them back. It also + # complicates things for no reason at the moment, as the priority is + # always set to 0. priority*: int32 # The priority (normalized). default to 1 cancel*: bool # Whether this revokes an entry wantType*: WantType # Note: defaults to enum 0, ie Block