Merge branch 'dev/etan/zf-branchpull' into feat/splitview

This commit is contained in:
Etan Kissling 2024-03-26 11:17:38 +01:00
commit be5ad82f33
No known key found for this signature in database
GPG Key ID: B21DA824C5A3D03D
1 changed files with 19 additions and 19 deletions

View File

@ -107,10 +107,18 @@ proc discoverBranch(
debug "Peer's head block root is already known"
return
# Many peers disconnect on rate limit, we have to avoid getting hit by it
const
maxRequestsPerBurst = 20
burstDuration = chronos.seconds(40)
maxRequestsPerBurst = 15
burstDuration = chronos.seconds(30)
let bucket = TokenBucket.new(maxRequestsPerBurst, burstDuration)
template consumeTokens(numTokens: int) =
try:
await bucket.consume(numTokens)
except CancelledError as exc:
raise exc
except CatchableError as exc:
raiseAssert "TokenBucket.consume should not fail: " & $exc.msg
var parentSlot = peerHeadSlot + 1
logScope: parentSlot
@ -123,17 +131,12 @@ proc discoverBranch(
return
debug "Discovering new branch from peer"
try:
await bucket.consume(1)
except CancelledError as exc:
raise exc
except CatchableError as exc:
raiseAssert "TokenBucket.consume should not fail: " & $exc.msg
consumeTokens(1)
let rsp = await peer.beaconBlocksByRoot_v2(BlockRootsList @[blockRoot])
if rsp.isErr:
# `eth2_network` already descored according to the specific error
debug "Failed to receive block", err = rsp.error
await sleepAsync(RESP_TIMEOUT_DUR)
consumeTokens(5)
continue
template blocks: untyped = rsp.get
@ -141,7 +144,7 @@ proc discoverBranch(
if blocks.len == 0:
peer.updateScore(PeerScoreNoValues)
debug "Received no blocks", numBlocks = blocks.len
await sleepAsync(RESP_TIMEOUT_DUR)
consumeTokens(5)
continue
if blocks.len > 1:
peer.updateScore(PeerScoreBadResponse)
@ -172,17 +175,12 @@ proc discoverBranch(
debug "Failed to discover new branch from peer"
return
try:
await bucket.consume(1)
except CancelledError as exc:
raise exc
except CatchableError as exc:
raiseAssert "TokenBucket.consume should not fail: " & $exc.msg
consumeTokens(1)
let r = await peer.blobSidecarsByRoot(BlobIdentifierList blobIds)
if r.isErr:
# `eth2_network` already descored according to the specific error
debug "Failed to receive blobs", err = r.error
await sleepAsync(RESP_TIMEOUT_DUR)
consumeTokens(5)
continue
template blobSidecars: untyped = r.unsafeGet
@ -190,7 +188,7 @@ proc discoverBranch(
peer.updateScore(PeerScoreMissingValues)
debug "Received not all blobs",
numBlobs = blobSidecars.len, expectedNumBlobs = blobIds.len
await sleepAsync(RESP_TIMEOUT_DUR)
consumeTokens(5)
continue
if blobSidecars.len > blobIds.len:
peer.updateScore(PeerScoreBadResponse)
@ -243,7 +241,9 @@ proc loop(self: ref BranchDiscovery) {.async: (raises: []).} =
try:
while true:
await self[].isActive.wait()
await sleepAsync(RESP_TIMEOUT_DUR)
const pollInterval = chronos.seconds(2)
await sleepAsync(pollInterval)
let peer =
if self[].peerQueue.len > 0: