mirror of
https://github.com/logos-storage/logos-storage-nim.git
synced 2026-01-09 08:53:10 +00:00
perf: add time-based yielding to hot loops
Part of https://github.com/codex-storage/nim-codex/issues/974 Signed-off-by: Chrysostomos Nanakos <chris@include.gr>
This commit is contained in:
parent
f7a3e90414
commit
6f378b3c46
@ -92,6 +92,7 @@ const
|
||||
# Don't do more than one discovery request per `DiscoveryRateLimit` seconds.
|
||||
DiscoveryRateLimit = 1.seconds
|
||||
DefaultPeerActivityTimeout = 1.minutes
|
||||
PresenceBatchSize = 1024
|
||||
|
||||
type
|
||||
TaskHandler* = proc(task: BlockExcPeerCtx): Future[void] {.gcsafe.}
|
||||
@ -196,6 +197,9 @@ proc refreshBlockKnowledge(
|
||||
await self.network.request.sendWantList(peer.id, toAsk, full = true)
|
||||
|
||||
proc refreshBlockKnowledge(self: BlockExcEngine) {.async: (raises: [CancelledError]).} =
|
||||
let runtimeQuota = 10.milliseconds
|
||||
var lastIdle = Moment.now()
|
||||
|
||||
for peer in self.peers.peers.values.toSeq:
|
||||
# We refresh block knowledge if:
|
||||
# 1. the peer hasn't been refreshed in a while;
|
||||
@ -217,6 +221,13 @@ proc refreshBlockKnowledge(self: BlockExcEngine) {.async: (raises: [CancelledErr
|
||||
else:
|
||||
trace "Not refreshing: peer is up to date", peer = peer.id
|
||||
|
||||
if (Moment.now() - lastIdle) >= runtimeQuota:
|
||||
try:
|
||||
await idleAsync()
|
||||
except CancelledError:
|
||||
discard
|
||||
lastIdle = Moment.now()
|
||||
|
||||
proc searchForNewPeers(self: BlockExcEngine, cid: Cid) =
|
||||
if self.lastDiscRequest + DiscoveryRateLimit < Moment.now():
|
||||
trace "Searching for new peers for", cid = cid
|
||||
@ -595,6 +606,9 @@ proc blocksDeliveryHandler*(
|
||||
var validatedBlocksDelivery: seq[BlockDelivery]
|
||||
let peerCtx = self.peers.get(peer)
|
||||
|
||||
let runtimeQuota = 10.milliseconds
|
||||
var lastIdle = Moment.now()
|
||||
|
||||
for bd in blocksDelivery:
|
||||
logScope:
|
||||
peer = peer
|
||||
@ -632,6 +646,15 @@ proc blocksDeliveryHandler*(
|
||||
|
||||
validatedBlocksDelivery.add(bd)
|
||||
|
||||
if (Moment.now() - lastIdle) >= runtimeQuota:
|
||||
try:
|
||||
await idleAsync()
|
||||
except CancelledError:
|
||||
discard
|
||||
except CatchableError:
|
||||
discard
|
||||
lastIdle = Moment.now()
|
||||
|
||||
codex_block_exchange_blocks_received.inc(validatedBlocksDelivery.len.int64)
|
||||
|
||||
if peerCtx != nil:
|
||||
@ -657,6 +680,9 @@ proc wantListHandler*(
|
||||
presence: seq[BlockPresence]
|
||||
schedulePeer = false
|
||||
|
||||
let runtimeQuota = 10.milliseconds
|
||||
var lastIdle = Moment.now()
|
||||
|
||||
try:
|
||||
for e in wantList.entries:
|
||||
logScope:
|
||||
@ -717,8 +743,20 @@ proc wantListHandler*(
|
||||
if e.wantType == WantType.WantBlock:
|
||||
schedulePeer = true
|
||||
|
||||
if presence.len >= PresenceBatchSize or (Moment.now() - lastIdle) >= runtimeQuota:
|
||||
if presence.len > 0:
|
||||
trace "Sending presence batch to remote", items = presence.len
|
||||
await self.network.request.sendPresence(peer, presence)
|
||||
presence = @[]
|
||||
try:
|
||||
await idleAsync()
|
||||
except CancelledError:
|
||||
discard
|
||||
lastIdle = Moment.now()
|
||||
|
||||
# Send any remaining presence messages
|
||||
if presence.len > 0:
|
||||
trace "Sending presence to remote", items = presence.mapIt($it).join(",")
|
||||
trace "Sending final presence to remote", items = presence.len
|
||||
await self.network.request.sendPresence(peer, presence)
|
||||
|
||||
if schedulePeer:
|
||||
|
||||
@ -38,12 +38,19 @@ method getBlocks*(
|
||||
localAddresses: seq[BlockAddress]
|
||||
remoteAddresses: seq[BlockAddress]
|
||||
|
||||
let runtimeQuota = 10.milliseconds
|
||||
var lastIdle = Moment.now()
|
||||
|
||||
for address in addresses:
|
||||
if not (await address in self.localStore):
|
||||
remoteAddresses.add(address)
|
||||
else:
|
||||
localAddresses.add(address)
|
||||
|
||||
if (Moment.now() - lastIdle) >= runtimeQuota:
|
||||
await idleAsync()
|
||||
lastIdle = Moment.now()
|
||||
|
||||
return chain(
|
||||
await self.localStore.getBlocks(localAddresses),
|
||||
self.engine.requestBlocks(remoteAddresses),
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user