tweak rate limiting
This commit is contained in:
parent
7f26fb1670
commit
1c04697e1d
|
@ -107,10 +107,18 @@ proc discoverBranch(
|
||||||
debug "Peer's head block root is already known"
|
debug "Peer's head block root is already known"
|
||||||
return
|
return
|
||||||
|
|
||||||
|
# Many peers disconnect on rate limit, we have to avoid getting hit by it
|
||||||
const
|
const
|
||||||
maxRequestsPerBurst = 20
|
maxRequestsPerBurst = 15
|
||||||
burstDuration = chronos.seconds(40)
|
burstDuration = chronos.seconds(30)
|
||||||
let bucket = TokenBucket.new(maxRequestsPerBurst, burstDuration)
|
let bucket = TokenBucket.new(maxRequestsPerBurst, burstDuration)
|
||||||
|
template consumeTokens(numTokens: int) =
|
||||||
|
try:
|
||||||
|
await bucket.consume(numTokens)
|
||||||
|
except CancelledError as exc:
|
||||||
|
raise exc
|
||||||
|
except CatchableError as exc:
|
||||||
|
raiseAssert "TokenBucket.consume should not fail: " & $exc.msg
|
||||||
|
|
||||||
var parentSlot = peerHeadSlot + 1
|
var parentSlot = peerHeadSlot + 1
|
||||||
logScope: parentSlot
|
logScope: parentSlot
|
||||||
|
@ -123,17 +131,12 @@ proc discoverBranch(
|
||||||
return
|
return
|
||||||
|
|
||||||
debug "Discovering new branch from peer"
|
debug "Discovering new branch from peer"
|
||||||
try:
|
consumeTokens(1)
|
||||||
await bucket.consume(1)
|
|
||||||
except CancelledError as exc:
|
|
||||||
raise exc
|
|
||||||
except CatchableError as exc:
|
|
||||||
raiseAssert "TokenBucket.consume should not fail: " & $exc.msg
|
|
||||||
let rsp = await peer.beaconBlocksByRoot_v2(BlockRootsList @[blockRoot])
|
let rsp = await peer.beaconBlocksByRoot_v2(BlockRootsList @[blockRoot])
|
||||||
if rsp.isErr:
|
if rsp.isErr:
|
||||||
# `eth2_network` already descored according to the specific error
|
# `eth2_network` already descored according to the specific error
|
||||||
debug "Failed to receive block", err = rsp.error
|
debug "Failed to receive block", err = rsp.error
|
||||||
await sleepAsync(RESP_TIMEOUT_DUR)
|
consumeTokens(5)
|
||||||
continue
|
continue
|
||||||
template blocks: untyped = rsp.get
|
template blocks: untyped = rsp.get
|
||||||
|
|
||||||
|
@ -141,7 +144,7 @@ proc discoverBranch(
|
||||||
if blocks.len == 0:
|
if blocks.len == 0:
|
||||||
peer.updateScore(PeerScoreNoValues)
|
peer.updateScore(PeerScoreNoValues)
|
||||||
debug "Received no blocks", numBlocks = blocks.len
|
debug "Received no blocks", numBlocks = blocks.len
|
||||||
await sleepAsync(RESP_TIMEOUT_DUR)
|
consumeTokens(5)
|
||||||
continue
|
continue
|
||||||
if blocks.len > 1:
|
if blocks.len > 1:
|
||||||
peer.updateScore(PeerScoreBadResponse)
|
peer.updateScore(PeerScoreBadResponse)
|
||||||
|
@ -172,17 +175,12 @@ proc discoverBranch(
|
||||||
debug "Failed to discover new branch from peer"
|
debug "Failed to discover new branch from peer"
|
||||||
return
|
return
|
||||||
|
|
||||||
try:
|
consumeTokens(1)
|
||||||
await bucket.consume(1)
|
|
||||||
except CancelledError as exc:
|
|
||||||
raise exc
|
|
||||||
except CatchableError as exc:
|
|
||||||
raiseAssert "TokenBucket.consume should not fail: " & $exc.msg
|
|
||||||
let r = await peer.blobSidecarsByRoot(BlobIdentifierList blobIds)
|
let r = await peer.blobSidecarsByRoot(BlobIdentifierList blobIds)
|
||||||
if r.isErr:
|
if r.isErr:
|
||||||
# `eth2_network` already descored according to the specific error
|
# `eth2_network` already descored according to the specific error
|
||||||
debug "Failed to receive blobs", err = r.error
|
debug "Failed to receive blobs", err = r.error
|
||||||
await sleepAsync(RESP_TIMEOUT_DUR)
|
consumeTokens(5)
|
||||||
continue
|
continue
|
||||||
template blobSidecars: untyped = r.unsafeGet
|
template blobSidecars: untyped = r.unsafeGet
|
||||||
|
|
||||||
|
@ -190,7 +188,7 @@ proc discoverBranch(
|
||||||
peer.updateScore(PeerScoreMissingValues)
|
peer.updateScore(PeerScoreMissingValues)
|
||||||
debug "Received not all blobs",
|
debug "Received not all blobs",
|
||||||
numBlobs = blobSidecars.len, expectedNumBlobs = blobIds.len
|
numBlobs = blobSidecars.len, expectedNumBlobs = blobIds.len
|
||||||
await sleepAsync(RESP_TIMEOUT_DUR)
|
consumeTokens(5)
|
||||||
continue
|
continue
|
||||||
if blobSidecars.len > blobIds.len:
|
if blobSidecars.len > blobIds.len:
|
||||||
peer.updateScore(PeerScoreBadResponse)
|
peer.updateScore(PeerScoreBadResponse)
|
||||||
|
@ -243,7 +241,9 @@ proc loop(self: ref BranchDiscovery) {.async: (raises: []).} =
|
||||||
try:
|
try:
|
||||||
while true:
|
while true:
|
||||||
await self[].isActive.wait()
|
await self[].isActive.wait()
|
||||||
await sleepAsync(RESP_TIMEOUT_DUR)
|
|
||||||
|
const pollInterval = chronos.seconds(2)
|
||||||
|
await sleepAsync(pollInterval)
|
||||||
|
|
||||||
let peer =
|
let peer =
|
||||||
if self[].peerQueue.len > 0:
|
if self[].peerQueue.len > 0:
|
||||||
|
|
Loading…
Reference in New Issue