diff --git a/codex/blockexchange/engine/engine.nim b/codex/blockexchange/engine/engine.nim index 17a5d820..3a2d8218 100644 --- a/codex/blockexchange/engine/engine.nim +++ b/codex/blockexchange/engine/engine.nim @@ -175,9 +175,14 @@ proc refreshBlockKnowledge( self: BlockExcEngine, peer: BlockExcPeerCtx ) {.async: (raises: [CancelledError]).} = if self.pendingBlocks.wantListLen > 0: - let cids = toSeq(self.pendingBlocks.wantList) - trace "Sending our want list to a peer", peer = peer.id, length = cids.len - await self.network.request.sendWantList(peer.id, cids, full = true) + # We send only blocks that the peer hasn't already told us that they already have. + let + peerHave = peer.peerHave + toAsk = self.pendingBlocks.wantList.toSeq.filterIt(it notin peerHave) + + if toAsk.len > 0: + trace "Sending want list to a peer", peer = peer.id, length = toAsk.len + await self.network.request.sendWantList(peer.id, toAsk, full = true) proc refreshBlockKnowledge(self: BlockExcEngine) {.async: (raises: [CancelledError]).} = for peer in self.peers.peers.values.toSeq: @@ -189,15 +194,13 @@ proc refreshBlockKnowledge(self: BlockExcEngine) {.async: (raises: [CancelledErr # want list in the coarsest way possible instead of over many # small updates. # + if peer.refreshInProgress: + trace "Peer refresh in progress", peer = peer.id + continue + # In dynamic swarms, staleness will dominate latency. if peer.lastRefresh < self.pendingBlocks.lastInclusion or peer.isKnowledgeStale: - # FIXME: we update the lastRefresh before actually refreshing because otherwise - # a slow peer will be bombarded with requests. If the request does fail or the - # peer does not reply, a retrying block will eventually issue this again. This - # is a complex and convoluted flow - ideally we should simply be tracking this - # request and retrying it on the absence of a response, eventually disconnecting - # the peer if it consistently fails to respond. - peer.refreshed() + peer.refreshRequested() # TODO: optimize this by keeping track of what was sent and sending deltas. # This should allow us to run much more frequent refreshes, and be way more # efficient about it. @@ -399,6 +402,8 @@ proc blockPresenceHandler*( if peerCtx.isNil: return + peerCtx.refreshReplied() + for blk in blocks: if presence =? Presence.init(blk): peerCtx.setPresence(presence) diff --git a/codex/blockexchange/peers/peercontext.nim b/codex/blockexchange/peers/peercontext.nim index c917b7ee..0e0d1060 100644 --- a/codex/blockexchange/peers/peercontext.nim +++ b/codex/blockexchange/peers/peercontext.nim @@ -25,12 +25,18 @@ import ../../logutils export payments, nitro +const + MinRefreshInterval = 5.seconds + MaxRefreshBackoff = 36 # 3 minutes + type BlockExcPeerCtx* = ref object of RootObj id*: PeerId blocks*: Table[BlockAddress, Presence] # remote peer have list including price wantedBlocks*: HashSet[BlockAddress] # blocks that the peer wants exchanged*: int # times peer has exchanged with us + refreshInProgress*: bool # indicates if a refresh is in progress lastRefresh*: Moment # last time we refreshed our knowledge of the blocks this peer has + refreshBackoff*: int = 1 # backoff factor for refresh requests account*: ?Account # ethereum account of this peer paymentChannel*: ?ChannelId # payment channel id blocksSent*: HashSet[BlockAddress] # blocks sent to peer @@ -39,7 +45,7 @@ type BlockExcPeerCtx* = ref object of RootObj activityTimeout*: Duration proc isKnowledgeStale*(self: BlockExcPeerCtx): bool = - self.lastRefresh + 5.minutes < Moment.now() + self.lastRefresh + self.refreshBackoff * MinRefreshInterval < Moment.now() proc isBlockSent*(self: BlockExcPeerCtx, address: BlockAddress): bool = address in self.blocksSent @@ -50,9 +56,19 @@ proc markBlockAsSent*(self: BlockExcPeerCtx, address: BlockAddress) = proc markBlockAsNotSent*(self: BlockExcPeerCtx, address: BlockAddress) = self.blocksSent.excl(address) -proc refreshed*(self: BlockExcPeerCtx) = +proc refreshRequested*(self: BlockExcPeerCtx) = + trace "Refresh requested for peer", peer = self.id, backoff = self.refreshBackoff + self.refreshInProgress = true self.lastRefresh = Moment.now() +proc refreshReplied*(self: BlockExcPeerCtx) = + self.refreshInProgress = false + self.lastRefresh = Moment.now() + self.refreshBackoff = min(self.refreshBackoff * 2, MaxRefreshBackoff) + +proc havesUpdated(self: BlockExcPeerCtx) = + self.refreshBackoff = 1 + proc peerHave*(self: BlockExcPeerCtx): HashSet[BlockAddress] = # XXX: this is ugly an inefficient, but since those will typically # be used in "joins", it's better to pay the price here and have @@ -63,6 +79,9 @@ proc contains*(self: BlockExcPeerCtx, address: BlockAddress): bool = address in self.blocks func setPresence*(self: BlockExcPeerCtx, presence: Presence) = + if presence.address notin self.blocks: + self.havesUpdated() + self.blocks[presence.address] = presence func cleanPresence*(self: BlockExcPeerCtx, addresses: seq[BlockAddress]) =