mirror of
https://github.com/status-im/nimbus-eth2.git
synced 2025-02-17 00:47:03 +00:00
Replace zero-point rewind with rewind to latest finalized epoch's first slot. (#1176)
* Replace zero-point rewind with rewind to latest finalized epoch's first slot. * Fix tests. * Add missing penalty for MissingParent Fix comments.
This commit is contained in:
parent
f3fd7e17a3
commit
96f26c447c
@ -25,7 +25,7 @@ const
|
||||
## Peer's response contains incorrect blocks.
|
||||
PeerScoreBadResponse* = -1000
|
||||
## Peer's response is not in requested range.
|
||||
PeerScoreJokeBlocks* = -200
|
||||
PeerScoreMissingBlocks* = -200
|
||||
## Peer response contains too many empty blocks.
|
||||
|
||||
type
|
||||
@ -78,7 +78,6 @@ type
|
||||
debtsQueue: HeapQueue[SyncRequest[T]]
|
||||
debtsCount: uint64
|
||||
readyQueue: HeapQueue[SyncResult[T]]
|
||||
zeroPoint: Option[Slot]
|
||||
suspects: seq[SyncResult[T]]
|
||||
|
||||
SyncManager*[A, B] = ref object
|
||||
@ -260,34 +259,8 @@ proc init*[T](t1: typedesc[SyncQueue], t2: typedesc[T],
|
||||
# missing" error. Lets call such peers "jokers", because they are joking
|
||||
# with responses.
|
||||
#
|
||||
# To fix "joker" problem i'm going to introduce "zero-point" which will
|
||||
# represent first non-empty slot in gap at the end of requested chunk.
|
||||
# If SyncQueue receives chunk of blocks with gap at the end and this chunk
|
||||
# will be successfully processed by `block_pool` it will set `zero_point` to
|
||||
# the first uncertain (empty) slot. For example:
|
||||
#
|
||||
# Case 1
|
||||
# X X X X X -
|
||||
# 3 4 5 6 7 8
|
||||
#
|
||||
# Case2
|
||||
# X X - - - -
|
||||
# 3 4 5 6 7 8
|
||||
#
|
||||
# In Case 1 `zero-point` will be equal to 8, in Case 2 `zero-point` will be
|
||||
# set to 5.
|
||||
#
|
||||
# When `zero-point` is set and the next received chunk of blocks will be
|
||||
# empty, then peer produced this chunk of blocks will be added to suspect
|
||||
# list.
|
||||
#
|
||||
# If the next chunk of blocks has at least one non-empty block and this chunk
|
||||
# will be successfully processed by `block_pool`, then `zero-point` will be
|
||||
# reset and suspect list will be cleared.
|
||||
#
|
||||
# If the `block_pool` failed to process next chunk of blocks, SyncQueue will
|
||||
# perform rollback to `zero-point` and penalize all the peers in suspect list.
|
||||
|
||||
# To fix "joker" problem we going to perform rollback to the latest finalized
|
||||
# epoch's first slot.
|
||||
doAssert(chunkSize > 0'u64, "Chunk size should not be zero")
|
||||
result = SyncQueue[T](
|
||||
startSlot: start,
|
||||
@ -372,10 +345,10 @@ proc resetWait*[T](sq: SyncQueue[T], toSlot: Option[Slot]) {.async.} =
|
||||
# without missing any blocks. There 3 sources:
|
||||
# 1. Debts queue.
|
||||
# 2. Processing queue (`inpSlot`, `outSlot`).
|
||||
# 3. Requested slot `toSlot` (which can be `zero-point` slot).
|
||||
# 3. Requested slot `toSlot`.
|
||||
#
|
||||
# Queue's `outSlot` is the lowest slot we added to `block_pool`, but
|
||||
# `zero-point` slot can be less then `outSlot`. `debtsQueue` holds only not
|
||||
# `toSlot` slot can be less then `outSlot`. `debtsQueue` holds only not
|
||||
# added slot requests, so it can't be bigger then `outSlot` value.
|
||||
var minSlot = sq.outSlot
|
||||
if toSlot.isSome():
|
||||
@ -441,8 +414,7 @@ proc push*[T](sq: SyncQueue[T], sr: SyncRequest[T],
|
||||
if res:
|
||||
continue
|
||||
else:
|
||||
# SyncQueue reset happens (it can't be `zero-point` reset, or continous
|
||||
# failure reset). We are exiting to wake up sync-worker.
|
||||
# SyncQueue reset happens. We are exiting to wake up sync-worker.
|
||||
exitNow = true
|
||||
break
|
||||
let syncres = SyncResult[T](request: sr, data: data)
|
||||
@ -460,55 +432,6 @@ proc push*[T](sq: SyncQueue[T], sr: SyncRequest[T],
|
||||
let item = sq.readyQueue.pop()
|
||||
let res = sq.syncUpdate(item.request, item.data)
|
||||
if res.isOk:
|
||||
if sq.zeroPoint.isSome():
|
||||
if item.isEmpty():
|
||||
# If the `zeropoint` is set and response is empty, we will add this
|
||||
# request to suspect list.
|
||||
debug "Adding peer to suspect list", peer = item.request.item,
|
||||
request_slot = item.request.slot,
|
||||
request_count = item.request.count,
|
||||
request_step = item.request.step,
|
||||
response_count = len(item.data), topics = "syncman"
|
||||
sq.suspects.add(item)
|
||||
else:
|
||||
# If the `zeropoint` is set and response is not empty, we will clean
|
||||
# suspect list and reset `zeropoint`.
|
||||
sq.suspects.setLen(0)
|
||||
sq.zeroPoint = none[Slot]()
|
||||
# At this point `zeropoint` is unset, but received response can have
|
||||
# gap at the end.
|
||||
if item.hasEndGap():
|
||||
debug "Zero-point reset and new zero-point found",
|
||||
peer = item.request.item, request_slot = item.request.slot,
|
||||
request_count = item.request.count,
|
||||
request_step = item.request.step,
|
||||
response_count = len(item.data),
|
||||
blocks_map = getShortMap(item.request, item.data),
|
||||
topics = "syncman"
|
||||
sq.suspects.add(item)
|
||||
sq.zeroPoint = some(item.getLastNonEmptySlot())
|
||||
else:
|
||||
debug "Zero-point reset", peer = item.request.item,
|
||||
request_slot = item.request.slot,
|
||||
request_count = item.request.count,
|
||||
request_step = item.request.step,
|
||||
response_count = len(item.data),
|
||||
blocks_map = getShortMap(item.request, item.data),
|
||||
topics = "syncman"
|
||||
else:
|
||||
# If the `zeropoint` is not set and response has gap at the end, we
|
||||
# will add first suspect to the suspect list and set `zeropoint`.
|
||||
if item.hasEndGap():
|
||||
debug "New zero-point found", peer = item.request.item,
|
||||
request_slot = item.request.slot,
|
||||
request_count = item.request.count,
|
||||
request_step = item.request.step,
|
||||
response_count = len(item.data),
|
||||
blocks_map = getShortMap(item.request, item.data),
|
||||
topics = "syncman"
|
||||
sq.suspects.add(item)
|
||||
sq.zeroPoint = some(item.getLastNonEmptySlot())
|
||||
|
||||
sq.outSlot = sq.outSlot + item.request.count
|
||||
sq.wakeupWaiters()
|
||||
else:
|
||||
@ -522,63 +445,26 @@ proc push*[T](sq: SyncQueue[T], sr: SyncRequest[T],
|
||||
var resetSlot: Option[Slot]
|
||||
|
||||
if res.error == BlockError.MissingParent:
|
||||
if sq.zeroPoint.isSome():
|
||||
# If the `zeropoint` is set and we are unable to store response in
|
||||
# `block_pool` we are going to revert suspicious responses list.
|
||||
|
||||
# If `zeropoint` is set, suspicious list should not be empty.
|
||||
var req: SyncRequest[T]
|
||||
if isEmpty(sq.suspects[0]):
|
||||
# If initial suspicious response is an empty list, then previous
|
||||
# chunk of blocks did not have a gap at the end. So we are going to
|
||||
# request suspicious response one more time without any changes.
|
||||
req = sq.suspects[0].request
|
||||
else:
|
||||
# If initial suspicious response is not an empty list, we are going
|
||||
# to request only gap at the end of the suspicious response.
|
||||
let startSlot = sq.suspects[0].getLastNonEmptySlot() + 1'u64
|
||||
let lastSlot = sq.suspects[0].request.lastSlot()
|
||||
req = SyncRequest.init(T, startSlot, lastSlot)
|
||||
|
||||
debug "Resolve joker's problem", request_slot = req.slot,
|
||||
request_count = req.count,
|
||||
request_step = req.step,
|
||||
suspects_count = (len(sq.suspects) - 1)
|
||||
|
||||
sq.suspects[0].request.item.updateScore(PeerScoreJokeBlocks)
|
||||
|
||||
sq.toDebtsQueue(req)
|
||||
# We move all left suspicious responses to the debts queue.
|
||||
if len(sq.suspects) > 1:
|
||||
for i in 1 ..< len(sq.suspects):
|
||||
sq.toDebtsQueue(sq.suspects[i].request)
|
||||
sq.suspects[i].request.item.updateScore(PeerScoreJokeBlocks)
|
||||
|
||||
# Reset state to the `zeropoint`.
|
||||
sq.suspects.setLen(0)
|
||||
resetSlot = sq.zeroPoint
|
||||
sq.zeroPoint = none[Slot]()
|
||||
# If we got `BlockError.MissingParent` it means that peer returns chain
|
||||
# of blocks with holes or `block_pool` is in incomplete state. We going
|
||||
# to rewind to the first slot at latest finalized epoch.
|
||||
let req = item.request
|
||||
let finalizedSlot = sq.getFirstSlotAFE()
|
||||
if finalizedSlot < req.slot:
|
||||
warn "Unexpected missing parent, rewind happens",
|
||||
peer = req.item, rewind_to_slot = finalizedSlot,
|
||||
request_slot = req.slot, request_count = req.count,
|
||||
request_step = req.step, blocks_count = len(item.data),
|
||||
blocks_map = getShortMap(req, item.data)
|
||||
resetSlot = some(finalizedSlot)
|
||||
req.item.updateScore(PeerScoreMissingBlocks)
|
||||
else:
|
||||
# If we got `BlockError.MissingParent` and `zero-point` is not set
|
||||
# it means that peer returns chain of blocks with holes or block_pool
|
||||
# in incorrect state. We going to rewind to the latest finalized
|
||||
# epoch.
|
||||
let req = item.request
|
||||
let finalizedSlot = sq.getFirstSlotAFE()
|
||||
if finalizedSlot < req.slot:
|
||||
warn "Unexpected missing parent, rewind to latest finalized epoch slot",
|
||||
peer = req.item, to_slot = finalizedSlot,
|
||||
request_slot = req.slot, request_count = req.count,
|
||||
request_step = req.step, blocks_count = len(item.data),
|
||||
blocks_map = getShortMap(req, item.data)
|
||||
await sq.resetWait(some(finalizedSlot))
|
||||
else:
|
||||
error "Unexpected missing parent at finalized epoch slot",
|
||||
peer = req.item, to_slot = finalizedSlot,
|
||||
request_slot = req.slot, request_count = req.count,
|
||||
request_step = req.step, blocks_count = len(item.data),
|
||||
blocks_map = getShortMap(req, item.data)
|
||||
req.item.updateScore(PeerScoreBadBlocks)
|
||||
error "Unexpected missing parent at finalized epoch slot",
|
||||
peer = req.item, to_slot = finalizedSlot,
|
||||
request_slot = req.slot, request_count = req.count,
|
||||
request_step = req.step, blocks_count = len(item.data),
|
||||
blocks_map = getShortMap(req, item.data)
|
||||
req.item.updateScore(PeerScoreBadBlocks)
|
||||
elif res.error == BlockError.Invalid:
|
||||
let req = item.request
|
||||
warn "Received invalid sequence of blocks", peer = req.item,
|
||||
@ -598,8 +484,9 @@ proc push*[T](sq: SyncQueue[T], sr: SyncRequest[T],
|
||||
sq.toDebtsQueue(item.request)
|
||||
if resetSlot.isSome():
|
||||
await sq.resetWait(resetSlot)
|
||||
debug "Zero-point reset happens", queue_input_slot = sq.inpSlot,
|
||||
queue_output_slot = sq.outSlot
|
||||
debug "Rewind to slot was happened", reset_slot = reset_slot.get(),
|
||||
queue_input_slot = sq.inpSlot,
|
||||
queue_output_slot = sq.outSlot
|
||||
break
|
||||
|
||||
proc push*[T](sq: SyncQueue[T], sr: SyncRequest[T]) =
|
||||
|
Loading…
x
Reference in New Issue
Block a user