mirror of
https://github.com/logos-storage/logos-storage-nim.git
synced 2026-01-05 15:03:07 +00:00
feat: add stopgap "adaptive" refresh
This commit is contained in:
parent
b274b29b83
commit
d7c403edfe
@ -175,9 +175,14 @@ proc refreshBlockKnowledge(
|
||||
self: BlockExcEngine, peer: BlockExcPeerCtx
|
||||
) {.async: (raises: [CancelledError]).} =
|
||||
if self.pendingBlocks.wantListLen > 0:
|
||||
let cids = toSeq(self.pendingBlocks.wantList)
|
||||
trace "Sending our want list to a peer", peer = peer.id, length = cids.len
|
||||
await self.network.request.sendWantList(peer.id, cids, full = true)
|
||||
# We send only blocks that the peer hasn't already told us that they already have.
|
||||
let
|
||||
peerHave = peer.peerHave
|
||||
toAsk = self.pendingBlocks.wantList.toSeq.filterIt(it notin peerHave)
|
||||
|
||||
if toAsk.len > 0:
|
||||
trace "Sending want list to a peer", peer = peer.id, length = toAsk.len
|
||||
await self.network.request.sendWantList(peer.id, toAsk, full = true)
|
||||
|
||||
proc refreshBlockKnowledge(self: BlockExcEngine) {.async: (raises: [CancelledError]).} =
|
||||
for peer in self.peers.peers.values.toSeq:
|
||||
@ -189,15 +194,13 @@ proc refreshBlockKnowledge(self: BlockExcEngine) {.async: (raises: [CancelledErr
|
||||
# want list in the coarsest way possible instead of over many
|
||||
# small updates.
|
||||
#
|
||||
if peer.refreshInProgress:
|
||||
trace "Peer refresh in progress", peer = peer.id
|
||||
continue
|
||||
|
||||
# In dynamic swarms, staleness will dominate latency.
|
||||
if peer.lastRefresh < self.pendingBlocks.lastInclusion or peer.isKnowledgeStale:
|
||||
# FIXME: we update the lastRefresh before actually refreshing because otherwise
|
||||
# a slow peer will be bombarded with requests. If the request does fail or the
|
||||
# peer does not reply, a retrying block will eventually issue this again. This
|
||||
# is a complex and convoluted flow - ideally we should simply be tracking this
|
||||
# request and retrying it on the absence of a response, eventually disconnecting
|
||||
# the peer if it consistently fails to respond.
|
||||
peer.refreshed()
|
||||
peer.refreshRequested()
|
||||
# TODO: optimize this by keeping track of what was sent and sending deltas.
|
||||
# This should allow us to run much more frequent refreshes, and be way more
|
||||
# efficient about it.
|
||||
@ -393,6 +396,8 @@ proc blockPresenceHandler*(
|
||||
if peerCtx.isNil:
|
||||
return
|
||||
|
||||
peerCtx.refreshReplied()
|
||||
|
||||
for blk in blocks:
|
||||
if presence =? Presence.init(blk):
|
||||
peerCtx.setPresence(presence)
|
||||
|
||||
@ -25,12 +25,18 @@ import ../../logutils
|
||||
|
||||
export payments, nitro
|
||||
|
||||
const
|
||||
MinRefreshInterval = 5.seconds
|
||||
MaxRefreshBackoff = 36 # 3 minutes
|
||||
|
||||
type BlockExcPeerCtx* = ref object of RootObj
|
||||
id*: PeerId
|
||||
blocks*: Table[BlockAddress, Presence] # remote peer have list including price
|
||||
wantedBlocks*: HashSet[BlockAddress] # blocks that the peer wants
|
||||
exchanged*: int # times peer has exchanged with us
|
||||
refreshInProgress*: bool # indicates if a refresh is in progress
|
||||
lastRefresh*: Moment # last time we refreshed our knowledge of the blocks this peer has
|
||||
refreshBackoff*: int = 1 # backoff factor for refresh requests
|
||||
account*: ?Account # ethereum account of this peer
|
||||
paymentChannel*: ?ChannelId # payment channel id
|
||||
blocksSent*: HashSet[BlockAddress] # blocks sent to peer
|
||||
@ -39,7 +45,7 @@ type BlockExcPeerCtx* = ref object of RootObj
|
||||
activityTimeout*: Duration
|
||||
|
||||
proc isKnowledgeStale*(self: BlockExcPeerCtx): bool =
|
||||
self.lastRefresh + 5.minutes < Moment.now()
|
||||
self.lastRefresh + self.refreshBackoff * MinRefreshInterval < Moment.now()
|
||||
|
||||
proc isBlockSent*(self: BlockExcPeerCtx, address: BlockAddress): bool =
|
||||
address in self.blocksSent
|
||||
@ -50,9 +56,19 @@ proc markBlockAsSent*(self: BlockExcPeerCtx, address: BlockAddress) =
|
||||
proc markBlockAsNotSent*(self: BlockExcPeerCtx, address: BlockAddress) =
|
||||
self.blocksSent.excl(address)
|
||||
|
||||
proc refreshed*(self: BlockExcPeerCtx) =
|
||||
proc refreshRequested*(self: BlockExcPeerCtx) =
|
||||
trace "Refresh requested for peer", peer = self.id, backoff = self.refreshBackoff
|
||||
self.refreshInProgress = true
|
||||
self.lastRefresh = Moment.now()
|
||||
|
||||
proc refreshReplied*(self: BlockExcPeerCtx) =
|
||||
self.refreshInProgress = false
|
||||
self.lastRefresh = Moment.now()
|
||||
self.refreshBackoff = min(self.refreshBackoff * 2, MaxRefreshBackoff)
|
||||
|
||||
proc havesUpdated(self: BlockExcPeerCtx) =
|
||||
self.refreshBackoff = 1
|
||||
|
||||
proc peerHave*(self: BlockExcPeerCtx): HashSet[BlockAddress] =
|
||||
# XXX: this is ugly an inefficient, but since those will typically
|
||||
# be used in "joins", it's better to pay the price here and have
|
||||
@ -63,6 +79,9 @@ proc contains*(self: BlockExcPeerCtx, address: BlockAddress): bool =
|
||||
address in self.blocks
|
||||
|
||||
func setPresence*(self: BlockExcPeerCtx, presence: Presence) =
|
||||
if presence.address notin self.blocks:
|
||||
self.havesUpdated()
|
||||
|
||||
self.blocks[presence.address] = presence
|
||||
|
||||
func cleanPresence*(self: BlockExcPeerCtx, addresses: seq[BlockAddress]) =
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user