mirror of
https://github.com/logos-storage/logos-storage-nim.git
synced 2026-01-07 16:03:13 +00:00
refactor: make markRequested idempotent
Returns false on duplicate marking attempts instead of logging errors, eliminating duplicate marking loop in blockPresenceHandler and preventing duplicate block requests across concurrent flows. Part of https://github.com/codex-storage/nim-codex/issues/974
This commit is contained in:
parent
dddf7424b4
commit
0636adf5e8
@ -78,10 +78,9 @@ declareCounter(
|
|||||||
)
|
)
|
||||||
declareCounter(
|
declareCounter(
|
||||||
codex_block_exchange_requests_failed_total,
|
codex_block_exchange_requests_failed_total,
|
||||||
"Total number of block requests that failed after exhausting retries"
|
"Total number of block requests that failed after exhausting retries",
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
const
|
const
|
||||||
DefaultMaxPeersPerRequest* = 10
|
DefaultMaxPeersPerRequest* = 10
|
||||||
# The default max message length of nim-libp2p is 100 megabytes, meaning we can
|
# The default max message length of nim-libp2p is 100 megabytes, meaning we can
|
||||||
@ -296,7 +295,7 @@ proc downloadInternal(
|
|||||||
let scheduledPeer =
|
let scheduledPeer =
|
||||||
if not self.pendingBlocks.isRequested(address):
|
if not self.pendingBlocks.isRequested(address):
|
||||||
let peer = self.selectPeer(peers.with)
|
let peer = self.selectPeer(peers.with)
|
||||||
self.pendingBlocks.markRequested(address, peer.id)
|
discard self.pendingBlocks.markRequested(address, peer.id)
|
||||||
peer.blockRequestScheduled(address)
|
peer.blockRequestScheduled(address)
|
||||||
trace "Request block from block retry loop"
|
trace "Request block from block retry loop"
|
||||||
await self.sendWantBlock(@[address], peer)
|
await self.sendWantBlock(@[address], peer)
|
||||||
@ -435,12 +434,11 @@ proc blockPresenceHandler*(
|
|||||||
|
|
||||||
let ourWantCids = ourWantList.filterIt(
|
let ourWantCids = ourWantList.filterIt(
|
||||||
it in peerHave and not self.pendingBlocks.retriesExhausted(it) and
|
it in peerHave and not self.pendingBlocks.retriesExhausted(it) and
|
||||||
not self.pendingBlocks.isRequested(it)
|
self.pendingBlocks.markRequested(it, peer)
|
||||||
).toSeq
|
).toSeq
|
||||||
|
|
||||||
for address in ourWantCids:
|
for address in ourWantCids:
|
||||||
self.pendingBlocks.decRetries(address)
|
self.pendingBlocks.decRetries(address)
|
||||||
self.pendingBlocks.markRequested(address, peer)
|
|
||||||
peerCtx.blockRequestScheduled(address)
|
peerCtx.blockRequestScheduled(address)
|
||||||
|
|
||||||
if ourWantCids.len > 0:
|
if ourWantCids.len > 0:
|
||||||
|
|||||||
@ -155,15 +155,18 @@ func getRequestPeer*(self: PendingBlocksManager, address: BlockAddress): ?PeerId
|
|||||||
self.blocks.withValue(address, pending):
|
self.blocks.withValue(address, pending):
|
||||||
result = pending[].requested
|
result = pending[].requested
|
||||||
|
|
||||||
proc markRequested*(self: PendingBlocksManager, address: BlockAddress, peer: PeerId) =
|
proc markRequested*(
|
||||||
|
self: PendingBlocksManager, address: BlockAddress, peer: PeerId
|
||||||
|
): bool =
|
||||||
## Marks this block as having been requested to a peer
|
## Marks this block as having been requested to a peer
|
||||||
##
|
##
|
||||||
|
|
||||||
if self.isRequested(address):
|
if self.isRequested(address):
|
||||||
error "Attempt to request block twice", address = address, peer = peer
|
return false
|
||||||
|
|
||||||
self.blocks.withValue(address, pending):
|
self.blocks.withValue(address, pending):
|
||||||
pending[].requested = peer.some
|
pending[].requested = peer.some
|
||||||
|
return true
|
||||||
|
|
||||||
proc clearRequest*(
|
proc clearRequest*(
|
||||||
self: PendingBlocksManager, address: BlockAddress, peer: ?PeerId = PeerId.none
|
self: PendingBlocksManager, address: BlockAddress, peer: ?PeerId = PeerId.none
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user