mirror of
https://github.com/status-im/nimbus-eth1.git
synced 2025-03-01 04:10:45 +00:00
Beacon sync mitigate deadlock with bogus sticky peers (#2943)
* Metrics cosmetics * Better naming for error threshold constants * Treating header/body process error different from response errors why: Error handling becomes active not until some consecutive failures appear. As both types of errors may interleave (i.g. no response errors) the counter reset for one type might affect the other. By doing it wrong, a peer might send repeatedly a bogus block so locking in the syncer in an endless loop.
This commit is contained in:
parent
650fec5a26
commit
0ce5234231
@ -173,6 +173,9 @@ proc blocksStagedCollect*(
|
|||||||
# so that `async` can capture that properly.
|
# so that `async` can capture that properly.
|
||||||
blk = (ref BlocksForImport)()
|
blk = (ref BlocksForImport)()
|
||||||
|
|
||||||
|
# Flag, not to reset error count
|
||||||
|
haveError = false
|
||||||
|
|
||||||
# nFetchBodiesRequest
|
# nFetchBodiesRequest
|
||||||
while true:
|
while true:
|
||||||
# Extract bottom range interval and fetch/stage it
|
# Extract bottom range interval and fetch/stage it
|
||||||
@ -196,14 +199,20 @@ proc blocksStagedCollect*(
|
|||||||
# Throw away first time block fetch data. Keep other data for a
|
# Throw away first time block fetch data. Keep other data for a
|
||||||
# partially assembled list.
|
# partially assembled list.
|
||||||
if nBlkBlocks == 0:
|
if nBlkBlocks == 0:
|
||||||
buddy.only.nBdyRespErrors.inc
|
buddy.only.nBdyProcErrors.inc
|
||||||
|
haveError = true
|
||||||
|
|
||||||
if (1 < buddy.only.nBdyRespErrors and buddy.ctrl.stopped) or
|
if ((0 < buddy.only.nBdyRespErrors or
|
||||||
fetchBodiesReqThresholdCount < buddy.only.nBdyRespErrors:
|
0 < buddy.only.nBdyProcErrors) and buddy.ctrl.stopped) or
|
||||||
|
fetchBodiesReqErrThresholdCount < buddy.only.nBdyRespErrors or
|
||||||
|
fetchBodiesProcessErrThresholdCount < buddy.only.nBdyProcErrors:
|
||||||
# Make sure that this peer does not immediately reconnect
|
# Make sure that this peer does not immediately reconnect
|
||||||
buddy.ctrl.zombie = true
|
buddy.ctrl.zombie = true
|
||||||
|
|
||||||
trace info & ": current block list discarded", peer, iv, ivReq,
|
trace info & ": current block list discarded", peer, iv, ivReq,
|
||||||
ctrl=buddy.ctrl.state, nRespErrors=buddy.only.nBdyRespErrors
|
nStaged=ctx.blk.staged.len, ctrl=buddy.ctrl.state,
|
||||||
|
bdyErrors=buddy.bdyErrors
|
||||||
|
|
||||||
ctx.blocksUnprocCommit(iv.len, iv)
|
ctx.blocksUnprocCommit(iv.len, iv)
|
||||||
# At this stage allow a task switch so that some other peer might try
|
# At this stage allow a task switch so that some other peer might try
|
||||||
# to work on the currently returned interval.
|
# to work on the currently returned interval.
|
||||||
@ -239,8 +248,16 @@ proc blocksStagedCollect*(
|
|||||||
raiseAssert info & ": duplicate key on staged queue iv=" & $iv
|
raiseAssert info & ": duplicate key on staged queue iv=" & $iv
|
||||||
qItem.data = blk[]
|
qItem.data = blk[]
|
||||||
|
|
||||||
|
# Reset block process errors (not too many consecutive failures this time)
|
||||||
|
if not haveError:
|
||||||
|
buddy.only.nBdyProcErrors = 0
|
||||||
|
|
||||||
trace info & ": staged blocks", peer, bottomBlock=iv.minPt.bnStr,
|
trace info & ": staged blocks", peer, bottomBlock=iv.minPt.bnStr,
|
||||||
nBlocks=blk.blocks.len, nStaged=ctx.blk.staged.len, ctrl=buddy.ctrl.state
|
nBlocks=blk.blocks.len, nStaged=ctx.blk.staged.len, ctrl=buddy.ctrl.state,
|
||||||
|
bdyErrors=buddy.bdyErrors
|
||||||
|
|
||||||
|
# Update, so it can be followed nicely
|
||||||
|
ctx.updateMetrics()
|
||||||
|
|
||||||
return true
|
return true
|
||||||
|
|
||||||
|
@ -22,9 +22,12 @@ import
|
|||||||
# Public functions
|
# Public functions
|
||||||
# ------------------------------------------------------------------------------
|
# ------------------------------------------------------------------------------
|
||||||
|
|
||||||
|
func bdyErrors*(buddy: BeaconBuddyRef): string =
|
||||||
|
$buddy.only.nBdyRespErrors & "/" & $buddy.only.nBdyProcErrors
|
||||||
|
|
||||||
proc fetchRegisterError*(buddy: BeaconBuddyRef) =
|
proc fetchRegisterError*(buddy: BeaconBuddyRef) =
|
||||||
buddy.only.nBdyRespErrors.inc
|
buddy.only.nBdyRespErrors.inc
|
||||||
if fetchBodiesReqThresholdCount < buddy.only.nBdyRespErrors:
|
if fetchBodiesReqErrThresholdCount < buddy.only.nBdyRespErrors:
|
||||||
buddy.ctrl.zombie = true # abandon slow peer
|
buddy.ctrl.zombie = true # abandon slow peer
|
||||||
|
|
||||||
proc bodiesFetch*(
|
proc bodiesFetch*(
|
||||||
@ -39,8 +42,7 @@ proc bodiesFetch*(
|
|||||||
start = Moment.now()
|
start = Moment.now()
|
||||||
nReq = blockHashes.len
|
nReq = blockHashes.len
|
||||||
|
|
||||||
trace trEthSendSendingGetBlockBodies, peer, nReq,
|
trace trEthSendSendingGetBlockBodies, peer, nReq, bdyErrors=buddy.bdyErrors
|
||||||
nRespErrors=buddy.only.nBdyRespErrors
|
|
||||||
|
|
||||||
var resp: Option[blockBodiesObj]
|
var resp: Option[blockBodiesObj]
|
||||||
try:
|
try:
|
||||||
@ -48,7 +50,7 @@ proc bodiesFetch*(
|
|||||||
except CatchableError as e:
|
except CatchableError as e:
|
||||||
buddy.fetchRegisterError()
|
buddy.fetchRegisterError()
|
||||||
`info` info & " error", peer, nReq, elapsed=(Moment.now() - start).toStr,
|
`info` info & " error", peer, nReq, elapsed=(Moment.now() - start).toStr,
|
||||||
error=($e.name), msg=e.msg, nRespErrors=buddy.only.nBdyRespErrors
|
error=($e.name), msg=e.msg, bdyErrors=buddy.bdyErrors
|
||||||
return err()
|
return err()
|
||||||
|
|
||||||
let elapsed = Moment.now() - start
|
let elapsed = Moment.now() - start
|
||||||
@ -57,8 +59,7 @@ proc bodiesFetch*(
|
|||||||
if resp.isNone or buddy.ctrl.stopped:
|
if resp.isNone or buddy.ctrl.stopped:
|
||||||
buddy.fetchRegisterError()
|
buddy.fetchRegisterError()
|
||||||
trace trEthRecvReceivedBlockBodies, peer, nReq, nResp=0,
|
trace trEthRecvReceivedBlockBodies, peer, nReq, nResp=0,
|
||||||
elapsed=elapsed.toStr, ctrl=buddy.ctrl.state,
|
elapsed=elapsed.toStr, ctrl=buddy.ctrl.state, bdyErrors=buddy.bdyErrors
|
||||||
nRespErrors=buddy.only.nBdyRespErrors
|
|
||||||
return err()
|
return err()
|
||||||
|
|
||||||
let b: seq[BlockBody] = resp.get.blocks
|
let b: seq[BlockBody] = resp.get.blocks
|
||||||
@ -71,15 +72,14 @@ proc bodiesFetch*(
|
|||||||
|
|
||||||
# Ban an overly slow peer for a while when seen in a row. Also there is a
|
# Ban an overly slow peer for a while when seen in a row. Also there is a
|
||||||
# mimimum share of the number of requested headers expected, typically 10%.
|
# mimimum share of the number of requested headers expected, typically 10%.
|
||||||
if fetchBodiesReqThresholdZombie < elapsed or
|
if fetchBodiesReqErrThresholdZombie < elapsed or
|
||||||
b.len.uint64 * 100 < nReq.uint64 * fetchBodiesReqMinResponsePC:
|
b.len.uint64 * 100 < nReq.uint64 * fetchBodiesReqMinResponsePC:
|
||||||
buddy.fetchRegisterError()
|
buddy.fetchRegisterError()
|
||||||
else:
|
else:
|
||||||
buddy.only.nBdyRespErrors = 0 # reset error count
|
buddy.only.nBdyRespErrors = 0 # reset error count
|
||||||
|
|
||||||
trace trEthRecvReceivedBlockBodies, peer, nReq, nResp=b.len,
|
trace trEthRecvReceivedBlockBodies, peer, nReq, nResp=b.len,
|
||||||
elapsed=elapsed.toStr, ctrl=buddy.ctrl.state,
|
elapsed=elapsed.toStr, ctrl=buddy.ctrl.state, bdyErrors=buddy.bdyErrors
|
||||||
nRespErrors=buddy.only.nBdyRespErrors
|
|
||||||
|
|
||||||
return ok(b)
|
return ok(b)
|
||||||
|
|
||||||
|
@ -132,6 +132,9 @@ proc headersStagedCollect*(
|
|||||||
# so that `async` can capture that properly.
|
# so that `async` can capture that properly.
|
||||||
lhc = (ref LinkedHChain)(parentHash: topLink)
|
lhc = (ref LinkedHChain)(parentHash: topLink)
|
||||||
|
|
||||||
|
# Flag, not to reset error count
|
||||||
|
haveError = false
|
||||||
|
|
||||||
while true:
|
while true:
|
||||||
# Extract top range interval and fetch/stage it
|
# Extract top range interval and fetch/stage it
|
||||||
let
|
let
|
||||||
@ -151,15 +154,17 @@ proc headersStagedCollect*(
|
|||||||
# Throw away opportunistic data (or first time header fetch.) Keep
|
# Throw away opportunistic data (or first time header fetch.) Keep
|
||||||
# other data for a partially assembled list.
|
# other data for a partially assembled list.
|
||||||
if isOpportunistic or nLhcHeaders == 0:
|
if isOpportunistic or nLhcHeaders == 0:
|
||||||
buddy.only.nHdrRespErrors.inc
|
buddy.only.nHdrProcErrors.inc
|
||||||
|
haveError = true
|
||||||
|
|
||||||
if (0 < buddy.only.nHdrRespErrors and buddy.ctrl.stopped) or
|
if ((0 < buddy.only.nHdrRespErrors or
|
||||||
fetchHeadersReqThresholdCount < buddy.only.nHdrRespErrors:
|
0 < buddy.only.nHdrProcErrors) and buddy.ctrl.stopped) or
|
||||||
|
fetchHeadersReqErrThresholdCount < buddy.only.nHdrRespErrors or
|
||||||
|
fetchHeadersProcessErrThresholdCount < buddy.only.nHdrProcErrors:
|
||||||
# Make sure that this peer does not immediately reconnect
|
# Make sure that this peer does not immediately reconnect
|
||||||
buddy.ctrl.zombie = true
|
buddy.ctrl.zombie = true
|
||||||
trace info & ": current header list discarded", peer, iv, ivReq,
|
trace info & ": current header list discarded", peer, iv, ivReq,
|
||||||
isOpportunistic,
|
isOpportunistic, ctrl=buddy.ctrl.state, hdrErrors=buddy.hdrErrors
|
||||||
ctrl=buddy.ctrl.state, nRespErrors=buddy.only.nHdrRespErrors
|
|
||||||
ctx.headersUnprocCommit(iv.len, iv)
|
ctx.headersUnprocCommit(iv.len, iv)
|
||||||
# At this stage allow a task switch so that some other peer might try
|
# At this stage allow a task switch so that some other peer might try
|
||||||
# to work on the currently returned interval.
|
# to work on the currently returned interval.
|
||||||
@ -195,9 +200,13 @@ proc headersStagedCollect*(
|
|||||||
raiseAssert info & ": duplicate key on staged queue iv=" & $iv
|
raiseAssert info & ": duplicate key on staged queue iv=" & $iv
|
||||||
qItem.data = lhc[]
|
qItem.data = lhc[]
|
||||||
|
|
||||||
trace info & ": staged a list of headers", peer,
|
# Reset header process errors (not too many consecutive failures this time)
|
||||||
topBlock=iv.maxPt.bnStr, nHeaders=lhc.revHdrs.len,
|
if not haveError:
|
||||||
nStaged=ctx.hdr.staged.len, isOpportunistic, ctrl=buddy.ctrl.state
|
buddy.only.nHdrProcErrors = 0
|
||||||
|
|
||||||
|
trace info & ": staged a list of headers", peer, topBlock=iv.maxPt.bnStr,
|
||||||
|
nHeaders=lhc.revHdrs.len, nStaged=ctx.hdr.staged.len, isOpportunistic,
|
||||||
|
ctrl=buddy.ctrl.state, hdrErrors=buddy.hdrErrors
|
||||||
|
|
||||||
return true
|
return true
|
||||||
|
|
||||||
|
@ -25,9 +25,17 @@ import
|
|||||||
|
|
||||||
proc registerError(buddy: BeaconBuddyRef) =
|
proc registerError(buddy: BeaconBuddyRef) =
|
||||||
buddy.only.nHdrRespErrors.inc
|
buddy.only.nHdrRespErrors.inc
|
||||||
if fetchHeadersReqThresholdCount < buddy.only.nHdrRespErrors:
|
if fetchHeadersReqErrThresholdCount < buddy.only.nHdrRespErrors:
|
||||||
buddy.ctrl.zombie = true # abandon slow peer
|
buddy.ctrl.zombie = true # abandon slow peer
|
||||||
|
|
||||||
|
# ------------------------------------------------------------------------------
|
||||||
|
# Public debugging & logging helpers
|
||||||
|
# ------------------------------------------------------------------------------
|
||||||
|
|
||||||
|
func hdrErrors*(buddy: BeaconBuddyRef): string =
|
||||||
|
$buddy.only.nHdrRespErrors & "/" & $buddy.only.nHdrProcErrors
|
||||||
|
|
||||||
|
|
||||||
# ------------------------------------------------------------------------------
|
# ------------------------------------------------------------------------------
|
||||||
# Public functions
|
# Public functions
|
||||||
# ------------------------------------------------------------------------------
|
# ------------------------------------------------------------------------------
|
||||||
@ -63,7 +71,7 @@ proc headersFetchReversed*(
|
|||||||
start = Moment.now()
|
start = Moment.now()
|
||||||
|
|
||||||
trace trEthSendSendingGetBlockHeaders & " reverse", peer, ivReq,
|
trace trEthSendSendingGetBlockHeaders & " reverse", peer, ivReq,
|
||||||
nReq=req.maxResults, useHash, nRespErrors=buddy.only.nHdrRespErrors
|
nReq=req.maxResults, useHash, hdrErrors=buddy.hdrErrors
|
||||||
|
|
||||||
# Fetch headers from peer
|
# Fetch headers from peer
|
||||||
var resp: Option[blockHeadersObj]
|
var resp: Option[blockHeadersObj]
|
||||||
@ -78,7 +86,7 @@ proc headersFetchReversed*(
|
|||||||
buddy.registerError()
|
buddy.registerError()
|
||||||
`info` info & " error", peer, ivReq, nReq=req.maxResults, useHash,
|
`info` info & " error", peer, ivReq, nReq=req.maxResults, useHash,
|
||||||
elapsed=(Moment.now() - start).toStr, error=($e.name), msg=e.msg,
|
elapsed=(Moment.now() - start).toStr, error=($e.name), msg=e.msg,
|
||||||
nRespErrors=buddy.only.nHdrRespErrors
|
hdrErrors=buddy.hdrErrors
|
||||||
return err()
|
return err()
|
||||||
|
|
||||||
let elapsed = Moment.now() - start
|
let elapsed = Moment.now() - start
|
||||||
@ -88,7 +96,7 @@ proc headersFetchReversed*(
|
|||||||
buddy.registerError()
|
buddy.registerError()
|
||||||
trace trEthRecvReceivedBlockHeaders, peer, nReq=req.maxResults, useHash,
|
trace trEthRecvReceivedBlockHeaders, peer, nReq=req.maxResults, useHash,
|
||||||
nResp=0, elapsed=elapsed.toStr, ctrl=buddy.ctrl.state,
|
nResp=0, elapsed=elapsed.toStr, ctrl=buddy.ctrl.state,
|
||||||
nRespErrors=buddy.only.nHdrRespErrors
|
hdrErrors=buddy.hdrErrors
|
||||||
return err()
|
return err()
|
||||||
|
|
||||||
let h: seq[Header] = resp.get.headers
|
let h: seq[Header] = resp.get.headers
|
||||||
@ -96,12 +104,12 @@ proc headersFetchReversed*(
|
|||||||
buddy.registerError()
|
buddy.registerError()
|
||||||
trace trEthRecvReceivedBlockHeaders, peer, nReq=req.maxResults, useHash,
|
trace trEthRecvReceivedBlockHeaders, peer, nReq=req.maxResults, useHash,
|
||||||
nResp=h.len, elapsed=elapsed.toStr, ctrl=buddy.ctrl.state,
|
nResp=h.len, elapsed=elapsed.toStr, ctrl=buddy.ctrl.state,
|
||||||
nRespErrors=buddy.only.nHdrRespErrors
|
hdrErrors=buddy.hdrErrors
|
||||||
return err()
|
return err()
|
||||||
|
|
||||||
# Ban an overly slow peer for a while when seen in a row. Also there is a
|
# Ban an overly slow peer for a while when seen in a row. Also there is a
|
||||||
# mimimum share of the number of requested headers expected, typically 10%.
|
# mimimum share of the number of requested headers expected, typically 10%.
|
||||||
if fetchHeadersReqThresholdZombie < elapsed or
|
if fetchHeadersReqErrThresholdZombie < elapsed or
|
||||||
h.len.uint64 * 100 < req.maxResults * fetchHeadersReqMinResponsePC:
|
h.len.uint64 * 100 < req.maxResults * fetchHeadersReqMinResponsePC:
|
||||||
buddy.registerError()
|
buddy.registerError()
|
||||||
else:
|
else:
|
||||||
@ -109,8 +117,7 @@ proc headersFetchReversed*(
|
|||||||
|
|
||||||
trace trEthRecvReceivedBlockHeaders, peer, nReq=req.maxResults, useHash,
|
trace trEthRecvReceivedBlockHeaders, peer, nReq=req.maxResults, useHash,
|
||||||
ivResp=BnRange.new(h[^1].number,h[0].number), nResp=h.len,
|
ivResp=BnRange.new(h[^1].number,h[0].number), nResp=h.len,
|
||||||
elapsed=elapsed.toStr, ctrl=buddy.ctrl.state,
|
elapsed=elapsed.toStr, ctrl=buddy.ctrl.state, hdrErrors=buddy.hdrErrors
|
||||||
nRespErrors=buddy.only.nHdrRespErrors
|
|
||||||
|
|
||||||
return ok(h)
|
return ok(h)
|
||||||
|
|
||||||
|
@ -60,7 +60,10 @@ template updateMetricsImpl(ctx: BeaconCtxRef) =
|
|||||||
metrics.set(beacon_coupler, ctx.layout.coupler.int64)
|
metrics.set(beacon_coupler, ctx.layout.coupler.int64)
|
||||||
metrics.set(beacon_dangling, ctx.layout.dangling.int64)
|
metrics.set(beacon_dangling, ctx.layout.dangling.int64)
|
||||||
metrics.set(beacon_head, ctx.layout.head.int64)
|
metrics.set(beacon_head, ctx.layout.head.int64)
|
||||||
metrics.set(beacon_target, ctx.target.consHead.number.int64)
|
|
||||||
|
# Show last valid state.
|
||||||
|
if 0 < ctx.target.consHead.number:
|
||||||
|
metrics.set(beacon_target, ctx.target.consHead.number.int64)
|
||||||
|
|
||||||
metrics.set(beacon_header_lists_staged, ctx.headersStagedQueueLen())
|
metrics.set(beacon_header_lists_staged, ctx.headersStagedQueueLen())
|
||||||
metrics.set(beacon_headers_unprocessed,
|
metrics.set(beacon_headers_unprocessed,
|
||||||
|
@ -54,12 +54,16 @@ const
|
|||||||
## On `Geth`, responses to larger requests are all truncted to 1024 header
|
## On `Geth`, responses to larger requests are all truncted to 1024 header
|
||||||
## entries (see `Geth` constant `maxHeadersServe`.)
|
## entries (see `Geth` constant `maxHeadersServe`.)
|
||||||
|
|
||||||
fetchHeadersReqThresholdZombie* = chronos.seconds(2)
|
fetchHeadersReqErrThresholdZombie* = chronos.seconds(2)
|
||||||
fetchHeadersReqThresholdCount* = 3
|
fetchHeadersReqErrThresholdCount* = 3
|
||||||
## Response time allowance. If the response time for the set of headers
|
## Response time allowance. If the response time for the set of headers
|
||||||
## exceeds this threshold for more than `fetchHeadersReqThresholdCount`
|
## exceeds this threshold for more than `fetchHeadersReqThresholdCount`
|
||||||
## times in a row, then this peer will be banned for a while.
|
## times in a row, then this peer will be banned for a while.
|
||||||
|
|
||||||
|
fetchHeadersProcessErrThresholdCount* = 3
|
||||||
|
## Similar to `fetchHeadersReqErrThresholdCount` but for the later part
|
||||||
|
## when errors occur while block headers are queued and further processed.
|
||||||
|
|
||||||
fetchHeadersReqMinResponsePC* = 10
|
fetchHeadersReqMinResponsePC* = 10
|
||||||
## Some peers only returned one header at a time. If these peers sit on a
|
## Some peers only returned one header at a time. If these peers sit on a
|
||||||
## farm, they might collectively slow down the download process. So this
|
## farm, they might collectively slow down the download process. So this
|
||||||
@ -92,10 +96,13 @@ const
|
|||||||
nFetchBodiesRequest* = 128
|
nFetchBodiesRequest* = 128
|
||||||
## Similar to `nFetchHeadersRequest`
|
## Similar to `nFetchHeadersRequest`
|
||||||
|
|
||||||
fetchBodiesReqThresholdZombie* = chronos.seconds(4)
|
fetchBodiesReqErrThresholdZombie* = chronos.seconds(4)
|
||||||
fetchBodiesReqThresholdCount* = 5
|
fetchBodiesReqErrThresholdCount* = 3
|
||||||
## Similar to `fetchHeadersReqThreshold*`
|
## Similar to `fetchHeadersReqThreshold*`
|
||||||
|
|
||||||
|
fetchBodiesProcessErrThresholdCount* = 3
|
||||||
|
## Similar to `fetchHeadersProcessErrThresholdCount`.
|
||||||
|
|
||||||
fetchBodiesReqMinResponsePC* = 10
|
fetchBodiesReqMinResponsePC* = 10
|
||||||
## Similar to `fetchHeadersReqMinResponsePC`
|
## Similar to `fetchHeadersReqMinResponsePC`
|
||||||
|
|
||||||
|
@ -126,8 +126,10 @@ type
|
|||||||
|
|
||||||
BeaconBuddyData* = object
|
BeaconBuddyData* = object
|
||||||
## Local descriptor data extension
|
## Local descriptor data extension
|
||||||
nHdrRespErrors*: int ## Number of errors/slow responses in a row
|
nHdrRespErrors*: uint8 ## Number of errors/slow responses in a row
|
||||||
nBdyRespErrors*: int ## Ditto for bodies
|
nHdrProcErrors*: uint8 ## Number of post processing errors
|
||||||
|
nBdyRespErrors*: uint8 ## Ditto for bodies
|
||||||
|
nBdyProcErrors*: uint8
|
||||||
|
|
||||||
# Debugging and logging.
|
# Debugging and logging.
|
||||||
nMultiLoop*: int ## Number of runs
|
nMultiLoop*: int ## Number of runs
|
||||||
|
Loading…
x
Reference in New Issue
Block a user