mirror of
https://github.com/logos-storage/logos-storage-nim.git
synced 2026-01-09 00:43:13 +00:00
feat(blockexchange): implement delta WantList updates with batching
Implements delta-based WantList updates to reduce network traffic during block exchange. Only sends newly added blocks instead of resending the entire WantList on every refresh. Also some network related fixes: - Add TCP_NODELAY flag to prevent Nagle's algorithm delays - Clear sendConn on stream reset to allow garbage collection - Improve error handling in NetworkPeer.send() Part of https://github.com/codex-storage/nim-codex/issues/974 Signed-off-by: Chrysostomos Nanakos <chris@include.gr>
This commit is contained in:
parent
6f378b3c46
commit
aea9337ddc
@ -84,9 +84,9 @@ declareCounter(
|
||||
const
|
||||
DefaultMaxPeersPerRequest* = 10
|
||||
# The default max message length of nim-libp2p is 100 megabytes, meaning we can
|
||||
# in principle fit up to 1600 64k blocks per message, so 50 is well under
|
||||
# in principle fit up to 1600 64k blocks per message, so 20 is well under
|
||||
# that number.
|
||||
DefaultMaxBlocksPerMessage = 50
|
||||
DefaultMaxBlocksPerMessage = 20
|
||||
DefaultTaskQueueSize = 100
|
||||
DefaultConcurrentTasks = 10
|
||||
# Don't do more than one discovery request per `DiscoveryRateLimit` seconds.
|
||||
@ -184,17 +184,74 @@ proc sendWantBlock(
|
||||
codex_block_exchange_want_block_lists_sent.inc()
|
||||
|
||||
proc refreshBlockKnowledge(
|
||||
self: BlockExcEngine, peer: BlockExcPeerCtx
|
||||
self: BlockExcEngine, peer: BlockExcPeerCtx, skipDelta = false, resetBackoff = false
|
||||
) {.async: (raises: [CancelledError]).} =
|
||||
if self.pendingBlocks.wantListLen > 0:
|
||||
# We send only blocks that the peer hasn't already told us that they already have.
|
||||
let
|
||||
peerHave = peer.peerHave
|
||||
toAsk = self.pendingBlocks.wantList.toSeq.filterIt(it notin peerHave)
|
||||
if self.pendingBlocks.wantListLen == 0:
|
||||
if peer.lastSentWants.len > 0:
|
||||
trace "Clearing want list tracking, no pending blocks", peer = peer.id
|
||||
peer.lastSentWants.clear()
|
||||
return
|
||||
|
||||
if toAsk.len > 0:
|
||||
trace "Sending want list to a peer", peer = peer.id, length = toAsk.len
|
||||
await self.network.request.sendWantList(peer.id, toAsk, full = true)
|
||||
# We send only blocks that the peer hasn't already told us that they already have.
|
||||
let
|
||||
peerHave = peer.peerHave
|
||||
toAsk = toHashSet(self.pendingBlocks.wantList.toSeq.filterIt(it notin peerHave))
|
||||
|
||||
if toAsk.len == 0:
|
||||
if peer.lastSentWants.len > 0:
|
||||
trace "Clearing want list tracking, peer has all blocks", peer = peer.id
|
||||
peer.lastSentWants.clear()
|
||||
return
|
||||
|
||||
let newWants = toAsk - peer.lastSentWants
|
||||
|
||||
if peer.lastSentWants.len > 0 and not skipDelta:
|
||||
if newWants.len > 0:
|
||||
trace "Sending delta want list update",
|
||||
peer = peer.id, newWants = newWants.len, totalWants = toAsk.len
|
||||
|
||||
let newWantsSeq = newWants.toSeq
|
||||
var offset = 0
|
||||
while offset < newWantsSeq.len:
|
||||
let batchEnd = min(offset + MaxWantListBatchSize, newWantsSeq.len)
|
||||
let batch = newWantsSeq[offset ..< batchEnd]
|
||||
|
||||
trace "Sending want list batch",
|
||||
peer = peer.id,
|
||||
batchSize = batch.len,
|
||||
offset = offset,
|
||||
total = newWantsSeq.len
|
||||
|
||||
await self.network.request.sendWantList(peer.id, batch, full = false)
|
||||
for address in batch:
|
||||
peer.lastSentWants.incl(address)
|
||||
|
||||
offset = batchEnd
|
||||
|
||||
if resetBackoff:
|
||||
peer.wantsUpdated
|
||||
else:
|
||||
trace "No changes in want list, skipping send", peer = peer.id
|
||||
peer.lastSentWants = toAsk
|
||||
else:
|
||||
trace "Sending full want list", peer = peer.id, length = toAsk.len
|
||||
|
||||
let toAskSeq = toAsk.toSeq
|
||||
var offset = 0
|
||||
while offset < toAskSeq.len:
|
||||
let batchEnd = min(offset + MaxWantListBatchSize, toAskSeq.len)
|
||||
let batch = toAskSeq[offset ..< batchEnd]
|
||||
|
||||
trace "Sending full want list batch",
|
||||
peer = peer.id, batchSize = batch.len, offset = offset, total = toAskSeq.len
|
||||
|
||||
await self.network.request.sendWantList(peer.id, batch, full = (offset == 0))
|
||||
for address in batch:
|
||||
peer.lastSentWants.incl(address)
|
||||
offset = batchEnd
|
||||
|
||||
if resetBackoff:
|
||||
peer.wantsUpdated
|
||||
|
||||
proc refreshBlockKnowledge(self: BlockExcEngine) {.async: (raises: [CancelledError]).} =
|
||||
let runtimeQuota = 10.milliseconds
|
||||
@ -211,13 +268,16 @@ proc refreshBlockKnowledge(self: BlockExcEngine) {.async: (raises: [CancelledErr
|
||||
#
|
||||
|
||||
# In dynamic swarms, staleness will dominate latency.
|
||||
if peer.isKnowledgeStale or peer.lastRefresh < self.pendingBlocks.lastInclusion:
|
||||
let
|
||||
hasNewBlocks = peer.lastRefresh < self.pendingBlocks.lastInclusion
|
||||
isKnowledgeStale = peer.isKnowledgeStale
|
||||
|
||||
if isKnowledgeStale or hasNewBlocks:
|
||||
if not peer.refreshInProgress:
|
||||
peer.refreshRequested()
|
||||
# TODO: optimize this by keeping track of what was sent and sending deltas.
|
||||
# This should allow us to run much more frequent refreshes, and be way more
|
||||
# efficient about it.
|
||||
await self.refreshBlockKnowledge(peer)
|
||||
await self.refreshBlockKnowledge(
|
||||
peer, skipDelta = isKnowledgeStale, resetBackoff = hasNewBlocks
|
||||
)
|
||||
else:
|
||||
trace "Not refreshing: peer is up to date", peer = peer.id
|
||||
|
||||
|
||||
@ -65,7 +65,9 @@ proc readLoop*(self: NetworkPeer, conn: Connection) {.async: (raises: []).} =
|
||||
except CatchableError as err:
|
||||
warn "Exception in blockexc read loop", msg = err.msg
|
||||
finally:
|
||||
trace "Detaching read loop", peer = self.id, connId = conn.oid
|
||||
warn "Detaching read loop", peer = self.id, connId = conn.oid
|
||||
if self.sendConn == conn:
|
||||
self.sendConn = nil
|
||||
await conn.close()
|
||||
|
||||
proc connect*(
|
||||
@ -89,7 +91,12 @@ proc send*(
|
||||
return
|
||||
|
||||
trace "Sending message", peer = self.id, connId = conn.oid
|
||||
await conn.writeLp(protobufEncode(msg))
|
||||
try:
|
||||
await conn.writeLp(protobufEncode(msg))
|
||||
except CatchableError as err:
|
||||
if self.sendConn == conn:
|
||||
self.sendConn = nil
|
||||
raise newException(LPStreamError, "Failed to send message: " & err.msg)
|
||||
|
||||
func new*(
|
||||
T: type NetworkPeer,
|
||||
|
||||
@ -28,6 +28,7 @@ export payments, nitro
|
||||
const
|
||||
MinRefreshInterval = 1.seconds
|
||||
MaxRefreshBackoff = 36 # 36 seconds
|
||||
MaxWantListBatchSize* = 1024 # Maximum blocks to send per WantList message
|
||||
|
||||
type BlockExcPeerCtx* = ref object of RootObj
|
||||
id*: PeerId
|
||||
@ -43,6 +44,8 @@ type BlockExcPeerCtx* = ref object of RootObj
|
||||
blocksRequested*: HashSet[BlockAddress] # pending block requests to this peer
|
||||
lastExchange*: Moment # last time peer has sent us a block
|
||||
activityTimeout*: Duration
|
||||
lastSentWants*: HashSet[BlockAddress]
|
||||
# track what wantList we last sent for delta updates
|
||||
|
||||
proc isKnowledgeStale*(self: BlockExcPeerCtx): bool =
|
||||
let staleness =
|
||||
@ -77,6 +80,9 @@ proc refreshReplied*(self: BlockExcPeerCtx) =
|
||||
proc havesUpdated(self: BlockExcPeerCtx) =
|
||||
self.refreshBackoff = 1
|
||||
|
||||
proc wantsUpdated*(self: BlockExcPeerCtx) =
|
||||
self.refreshBackoff = 1
|
||||
|
||||
proc peerHave*(self: BlockExcPeerCtx): HashSet[BlockAddress] =
|
||||
# XXX: this is ugly an inefficient, but since those will typically
|
||||
# be used in "joins", it's better to pay the price here and have
|
||||
|
||||
@ -211,7 +211,7 @@ proc new*(
|
||||
.withMaxConnections(config.maxPeers)
|
||||
.withAgentVersion(config.agentString)
|
||||
.withSignedPeerRecord(true)
|
||||
.withTcpTransport({ServerFlags.ReuseAddr})
|
||||
.withTcpTransport({ServerFlags.ReuseAddr, ServerFlags.TcpNoDelay})
|
||||
.build()
|
||||
|
||||
var
|
||||
|
||||
@ -53,7 +53,7 @@ logScope:
|
||||
topics = "codex node"
|
||||
|
||||
const
|
||||
DefaultFetchBatch = 8192
|
||||
DefaultFetchBatch = 2048
|
||||
MaxOnBatchBlocks = 128
|
||||
|
||||
type
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user