diff --git a/codex/blockexchange/engine/engine.nim b/codex/blockexchange/engine/engine.nim index fcc6563d..6a29c35c 100644 --- a/codex/blockexchange/engine/engine.nim +++ b/codex/blockexchange/engine/engine.nim @@ -73,6 +73,9 @@ const DefaultMaxBlocksPerMessage = 500 DefaultTaskQueueSize = 100 DefaultConcurrentTasks = 10 + # Don't do more than one discovery request per `DiscoveryRateLimit` seconds. + DiscoveryRateLimit = 1.seconds + DefaultPeerActivityTimeout = 1.minutes type TaskHandler* = proc(task: BlockExcPeerCtx): Future[void] {.gcsafe.} @@ -94,6 +97,7 @@ type pricing*: ?Pricing # Optional bandwidth pricing discovery*: DiscoveryEngine advertiser*: Advertiser + lastDiscRequest: Moment # time of last discovery request Pricing* = object address*: EthAddress @@ -193,6 +197,14 @@ proc refreshBlockKnowledge(self: BlockExcEngine) {.async: (raises: [CancelledErr # efficient about it. await self.refreshBlockKnowledge(peer) +proc searchForNewPeers(self: BlockExcEngine, cid: Cid) = + if self.lastDiscRequest + DiscoveryRateLimit < Moment.now(): + trace "Searching for new peers for", cid = cid + self.lastDiscRequest = Moment.now() # always refresh before calling await! + self.discovery.queueFindBlocksReq(@[cid]) + else: + trace "Not searching for new peers, rate limit not expired", cid = cid + proc randomPeer(peers: seq[BlockExcPeerCtx]): BlockExcPeerCtx = Rng.instance.sample(peers) @@ -215,34 +227,56 @@ proc downloadInternal( handle.fail(newException(RetriesExhaustedError, "Error retries exhausted")) break - trace "Running retry handle" let peers = self.peers.getPeersForBlock(address) logScope: peersWith = peers.with.len peersWithout = peers.without.len - trace "Peers for block" - if peers.with.len > 0: - self.pendingBlocks.setInFlight(address, true) - await self.sendWantBlock(@[address], peers.with.randomPeer) - else: - self.pendingBlocks.setInFlight(address, false) + if peers.with.len == 0: + # We know of no peers that have the block. if peers.without.len > 0: - # We have peers connected, but none of them have the block. This + # If we have peers connected but none of them have the block, this # could be because our knowledge about what they have has run stale. # Tries to refresh it. await self.refreshBlockKnowledge() - self.discovery.queueFindBlocksReq(@[address.cidOrTreeCid]) + # Also tries to look for new peers for good measure. + # TODO: in the future, peer search and knowledge maintenance should + # be completely decoupled from one another. It is very hard to + # control what happens and how many neighbors we get like this. + self.searchForNewPeers(address.cidOrTreeCid) - # FIXME: blocks should not blindly reschedule themselves. Instead, - # we should only reschedule a block if the peer drops, or we are - # in endgame mode. - await (handle or sleepAsync(self.pendingBlocks.retryInterval)) + # We wait for a bit and then retry. Since we've shipped our wantlists to + # connected peers, they might reply and we might request the block in the + # meantime as part of `blockPresenceHandler`. + await handle or sleepAsync(self.pendingBlocks.retryInterval) + # If we already got the block, we're done. Otherwise, we'll go for another + # cycle, potentially refreshing knowledge on peers again, and looking up + # the DHT again. + if handle.finished: + break + trace "No peers for block, will retry shortly" + continue + + let scheduledPeer = peers.with.randomPeer + self.pendingBlocks.setInFlight(address, true) + scheduledPeer.blockRequested(address) + await self.sendWantBlock(@[address], scheduledPeer) + + let activityTimer = scheduledPeer.activityTimer() + await handle or activityTimer # TODO: or peerDropped + activityTimer.cancel() + + # XXX: we should probably not have this. Blocks should be retried + # to infinity unless cancelled by the client. self.pendingBlocks.decRetries(address) if handle.finished: trace "Handle for block finished", failed = handle.failed break + else: + # If the peer timed out, retries immediately. + trace "Dropping timed out peer.", peer = scheduledPeer.id + # TODO: disconnect peer except CancelledError as exc: trace "Block download cancelled" if not handle.finished: @@ -353,6 +387,7 @@ proc blockPresenceHandler*( for address in ourWantCids: self.pendingBlocks.setInFlight(address, true) self.pendingBlocks.decRetries(address) + peerCtx.blockRequested(address) if ourWantCids.len > 0: trace "Peer has blocks in our wantList", peer, wants = ourWantCids @@ -407,15 +442,13 @@ proc cancelBlocks( return entry.peerId try: - # Does the peer have any of the blocks we're canceling? for peerCtx in self.peers.peers.values: - let intersection = peerCtx.peerHave.intersection(addrSet) + # Have we requested any of the blocks we're cancelling to this peer? + let intersection = peerCtx.blocksRequested.intersection(addrSet) if intersection.len > 0: pendingCancellations[peerCtx.id] = intersection # If so, dispatches cancellations. - # FIXME: we're still spamming peers - the fact that the peer has the block does - # not mean we've requested it. let (succeededFuts, failedFuts) = await allFinishedFailed[PeerId]( toSeq(pendingCancellations.pairs).map(processPeer) ) @@ -424,6 +457,8 @@ proc cancelBlocks( let ctx = self.peers.get(peerId) if not ctx.isNil: ctx.cleanPresence(addrs) + for address in pendingCancellations[peerId]: + ctx.blockRequestCancelled(address) if failedFuts.len > 0: warn "Failed to send block request cancellations to peers", peers = failedFuts.len @@ -498,6 +533,8 @@ proc blocksDeliveryHandler*( trace "Received blocks from peer", peer, blocks = (blocksDelivery.mapIt(it.address)) var validatedBlocksDelivery: seq[BlockDelivery] + let peerCtx = self.peers.get(peer) + for bd in blocksDelivery: logScope: peer = peer @@ -523,6 +560,9 @@ proc blocksDeliveryHandler*( ).errorOption: warn "Unable to store proof and cid for a block" continue + + if peerCtx != nil: + peerCtx.blockReceived(bd.address) except CatchableError as exc: warn "Error handling block delivery", error = exc.msg continue @@ -531,7 +571,6 @@ proc blocksDeliveryHandler*( codex_block_exchange_blocks_received.inc(validatedBlocksDelivery.len.int64) - let peerCtx = self.peers.get(peer) if peerCtx != nil: if err =? catch(await self.payForBlocks(peerCtx, blocksDelivery)).errorOption: warn "Error paying for blocks", err = err.msg @@ -658,7 +697,7 @@ proc setupPeer*( trace "Setting up peer", peer if peer notin self.peers: - let peerCtx = BlockExcPeerCtx(id: peer) + let peerCtx = BlockExcPeerCtx(id: peer, activityTimeout: DefaultPeerActivityTimeout) trace "Setting up new peer", peer self.peers.add(peerCtx) trace "Added peer", peers = self.peers.len @@ -707,14 +746,14 @@ proc taskHandler*( # Send to the peer blocks he wants to get, # if they present in our local store - # Blocks that are in flight have already been picked up by other tasks and + # Blocks that have been sent have already been picked up by other tasks and # should not be re-sent. var - wantedBlocks = peerCtx.wantedBlocks.filterIt(not peerCtx.isInFlight(it)) + wantedBlocks = peerCtx.wantedBlocks.filterIt(not peerCtx.isBlockSent(it)) sent: HashSet[BlockAddress] for wantedBlock in wantedBlocks: - peerCtx.addInFlight(wantedBlock) + peerCtx.markBlockAsSent(wantedBlock) try: for batch in wantedBlocks.toSeq.splitBatches(self.maxBlocksPerMessage): @@ -724,7 +763,7 @@ proc taskHandler*( without blockDelivery =? await self.localLookup(wantedBlock), err: error "Error getting block from local store", err = err.msg, address = wantedBlock - peerCtx.removeInFlight(wantedBlock) + peerCtx.markBlockAsNotSent(wantedBlock) continue blockDeliveries.add(blockDelivery) sent.incl(wantedBlock) @@ -743,7 +782,7 @@ proc taskHandler*( finally: # Better safe than sorry: if an exception does happen, we don't want to keep # those in flight as it'll effectively prevent the blocks from ever being sent. - peerCtx.blocksInFlight.keepItIf(it notin wantedBlocks) + peerCtx.blocksSent.keepItIf(it notin wantedBlocks) proc blockexcTaskRunner(self: BlockExcEngine) {.async: (raises: []).} = ## process tasks diff --git a/codex/blockexchange/engine/pendingblocks.nim b/codex/blockexchange/engine/pendingblocks.nim index eee66bc2..d46d3161 100644 --- a/codex/blockexchange/engine/pendingblocks.nim +++ b/codex/blockexchange/engine/pendingblocks.nim @@ -34,7 +34,7 @@ declareGauge( const DefaultBlockRetries* = 3000 - DefaultRetryInterval* = 180.seconds + DefaultRetryInterval* = 5.seconds type RetriesExhaustedError* = object of CatchableError diff --git a/codex/blockexchange/peers/peercontext.nim b/codex/blockexchange/peers/peercontext.nim index 857a8fa9..cca6d387 100644 --- a/codex/blockexchange/peers/peercontext.nim +++ b/codex/blockexchange/peers/peercontext.nim @@ -30,23 +30,25 @@ type BlockExcPeerCtx* = ref object of RootObj blocks*: Table[BlockAddress, Presence] # remote peer have list including price wantedBlocks*: HashSet[BlockAddress] # blocks that the peer wants exchanged*: int # times peer has exchanged with us - lastExchange*: Moment # last time peer has exchanged with us lastRefresh*: Moment # last time we refreshed our knowledge of the blocks this peer has account*: ?Account # ethereum account of this peer paymentChannel*: ?ChannelId # payment channel id - blocksInFlight*: HashSet[BlockAddress] # blocks in flight towards peer + blocksSent*: HashSet[BlockAddress] # blocks sent to peer + blocksRequested*: HashSet[BlockAddress] # pending block requests to this peer + lastExchange*: Moment # last time peer has sent us a block + activityTimeout*: Duration proc isKnowledgeStale*(self: BlockExcPeerCtx): bool = self.lastRefresh + 5.minutes < Moment.now() -proc isInFlight*(self: BlockExcPeerCtx, address: BlockAddress): bool = - address in self.blocksInFlight +proc isBlockSent*(self: BlockExcPeerCtx, address: BlockAddress): bool = + address in self.blocksSent -proc addInFlight*(self: BlockExcPeerCtx, address: BlockAddress) = - self.blocksInFlight.incl(address) +proc markBlockAsSent*(self: BlockExcPeerCtx, address: BlockAddress) = + self.blocksSent.incl(address) -proc removeInFlight*(self: BlockExcPeerCtx, address: BlockAddress) = - self.blocksInFlight.excl(address) +proc markBlockAsNotSent*(self: BlockExcPeerCtx, address: BlockAddress) = + self.blocksSent.excl(address) proc refreshed*(self: BlockExcPeerCtx) = self.lastRefresh = Moment.now() @@ -77,3 +79,31 @@ func price*(self: BlockExcPeerCtx, addresses: seq[BlockAddress]): UInt256 = price += precense[].price price + +proc blockRequested*(self: BlockExcPeerCtx, address: BlockAddress) = + # We start counting the timeout from the first block requested. + if self.blocksRequested.len == 0: + self.lastExchange = Moment.now() + self.blocksRequested.incl(address) + +proc blockRequestCancelled*(self: BlockExcPeerCtx, address: BlockAddress) = + self.blocksRequested.excl(address) + +proc blockReceived*(self: BlockExcPeerCtx, address: BlockAddress) = + self.blocksRequested.excl(address) + self.lastExchange = Moment.now() + +proc activityTimer*( + self: BlockExcPeerCtx +): Future[void] {.async: (raises: [CancelledError]).} = + ## This is called by the block exchange when a block is scheduled for this peer. + ## If the peer sends no blocks for a while, it is considered inactive/uncooperative + ## and the peer is dropped. Note that ANY block that the peer sends will reset this + ## timer for all blocks. + ## + while true: + let idleTime = Moment.now() - self.lastExchange + if idleTime > self.activityTimeout: + return + + await sleepAsync(self.activityTimeout - idleTime)