Sync gaps fix (#4090)

This commit is contained in:
Eugene Kabanov 2022-09-19 12:37:42 +03:00 committed by GitHub
parent abd6581b22
commit 174292b7e4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 127 additions and 20 deletions

View File

@ -109,6 +109,7 @@ type
lastMetadataTime*: Moment lastMetadataTime*: Moment
direction*: PeerType direction*: PeerType
disconnectedFut: Future[void] disconnectedFut: Future[void]
statistics*: SyncResponseStats
PeerAddr* = object PeerAddr* = object
peerId*: PeerId peerId*: PeerId
@ -367,6 +368,15 @@ func updateScore*(peer: Peer, score: int) {.inline.} =
if peer.score > PeerScoreHighLimit: if peer.score > PeerScoreHighLimit:
peer.score = PeerScoreHighLimit peer.score = PeerScoreHighLimit
func updateStats*(peer: Peer, index: SyncResponseKind,
value: uint64) {.inline.} =
## Update peer's ``peer`` specific ``index`` statistics with value ``value``.
peer.statistics.update(index, value)
func getStats*(peer: Peer, index: SyncResponseKind): uint64 {.inline.} =
## Returns current statistics value for peer ``peer`` and index ``index``.
peer.statistics.get(index)
func calcThroughput(dur: Duration, value: uint64): float = func calcThroughput(dur: Duration, value: uint64): float =
let secs = float(chronos.seconds(1).nanoseconds) let secs = float(chronos.seconds(1).nanoseconds)
if isZero(dur): if isZero(dur):

View File

@ -44,3 +44,16 @@ const
PeerScoreUnviableFork* = -200 PeerScoreUnviableFork* = -200
## Peer responded with blocks from an unviable fork - are they on a ## Peer responded with blocks from an unviable fork - are they on a
## different chain? ## different chain?
type
SyncResponseKind* {.pure.} = enum
Good, Empty
SyncResponseStats* = array[int(high(SyncResponseKind)) + 1, uint64]
template get*(a: SyncResponseStats, index: SyncResponseKind): uint64 =
a[int(index)]
template update*(a: var SyncResponseStats, index: SyncResponseKind,
value: uint64) =
a[int(index)] += value

View File

@ -46,6 +46,11 @@ type
request*: SyncRequest[T] request*: SyncRequest[T]
data*: seq[ref ForkedSignedBeaconBlock] data*: seq[ref ForkedSignedBeaconBlock]
GapItem*[T] = object
start*: Slot
finish*: Slot
item*: T
SyncWaiter* = ref object SyncWaiter* = ref object
future: Future[void] future: Future[void]
reset: bool reset: bool
@ -64,6 +69,7 @@ type
queueSize*: int queueSize*: int
counter*: uint64 counter*: uint64
pending*: Table[uint64, SyncRequest[T]] pending*: Table[uint64, SyncRequest[T]]
gapList*: seq[GapItem[T]]
waiters: seq[SyncWaiter] waiters: seq[SyncWaiter]
getSafeSlot*: GetSlotCallback getSafeSlot*: GetSlotCallback
debtsQueue: HeapQueue[SyncRequest[T]] debtsQueue: HeapQueue[SyncRequest[T]]
@ -385,6 +391,47 @@ proc getLastNonEmptySlot*[T](sr: SyncResult[T]): Slot {.inline.} =
else: else:
sr.data[^1][].slot sr.data[^1][].slot
proc processGap[T](sq: SyncQueue[T], sr: SyncResult[T]) =
if sr.isEmpty():
let gitem = GapItem[T](start: sr.request.slot,
finish: sr.request.slot + sr.request.count - 1'u64,
item: sr.request.item)
sq.gapList.add(gitem)
else:
if sr.hasEndGap():
let gitem = GapItem[T](start: sr.getLastNonEmptySlot() + 1'u64,
finish: sr.request.slot + sr.request.count - 1'u64,
item: sr.request.item)
sq.gapList.add(gitem)
else:
sq.gapList.reset()
proc rewardForGaps[T](sq: SyncQueue[T], score: int) =
mixin updateScore, getStats
logScope:
sync_ident = sq.ident
direction = sq.kind
topics = "syncman"
for gap in sq.gapList:
if score < 0:
# Every empty response increases penalty by 25%, but not more than 200%.
let
emptyCount = gap.item.getStats(SyncResponseKind.Empty)
goodCount = gap.item.getStats(SyncResponseKind.Good)
if emptyCount <= goodCount:
gap.item.updateScore(score)
else:
let
weight = int(min(emptyCount - goodCount, 8'u64))
newScore = score + score * weight div 4
gap.item.updateScore(newScore)
debug "Peer received gap penalty", peer = gap.item,
penalty = newScore
else:
gap.item.updateScore(score)
proc toDebtsQueue[T](sq: SyncQueue[T], sr: SyncRequest[T]) = proc toDebtsQueue[T](sq: SyncQueue[T], sr: SyncRequest[T]) =
sq.debtsQueue.push(sr) sq.debtsQueue.push(sr)
sq.debtsCount = sq.debtsCount + sr.count sq.debtsCount = sq.debtsCount + sr.count
@ -551,7 +598,7 @@ proc push*[T](sq: SyncQueue[T], sr: SyncRequest[T],
topics = "syncman" topics = "syncman"
## Push successful result to queue ``sq``. ## Push successful result to queue ``sq``.
mixin updateScore mixin updateScore, updateStats, getStats
if sr.index notin sq.pending: if sr.index notin sq.pending:
# If request `sr` not in our pending list, it only means that # If request `sr` not in our pending list, it only means that
@ -611,10 +658,10 @@ proc push*[T](sq: SyncQueue[T], sr: SyncRequest[T],
# Validating received blocks one by one # Validating received blocks one by one
var var
hasOkBlock = false
hasInvalidBlock = false hasInvalidBlock = false
unviableBlock: Option[(Eth2Digest, Slot)] unviableBlock: Option[(Eth2Digest, Slot)]
missingParentSlot: Option[Slot] missingParentSlot: Option[Slot]
goodBlock: Option[Slot]
# compiler segfault if this is moved into the for loop, at time of writing # compiler segfault if this is moved into the for loop, at time of writing
# TODO this does segfault in 1.2 but not 1.6, so remove workaround when 1.2 # TODO this does segfault in 1.2 but not 1.6, so remove workaround when 1.2
@ -624,7 +671,7 @@ proc push*[T](sq: SyncQueue[T], sr: SyncRequest[T],
for blk in sq.blocks(item): for blk in sq.blocks(item):
res = await sq.blockVerifier(blk[]) res = await sq.blockVerifier(blk[])
if res.isOk(): if res.isOk():
hasOkBlock = true goodBlock = some(blk[].slot)
else: else:
case res.error() case res.error()
of BlockError.MissingParent: of BlockError.MissingParent:
@ -654,14 +701,25 @@ proc push*[T](sq: SyncQueue[T], sr: SyncRequest[T],
# with, hopefully, a different peer # with, hopefully, a different peer
let retryRequest = let retryRequest =
hasInvalidBlock or unviableBlock.isSome() or missingParentSlot.isSome() hasInvalidBlock or unviableBlock.isSome() or missingParentSlot.isSome()
if not retryRequest: if not(retryRequest):
let numSlotsAdvanced = item.request.count - sq.numAlreadyKnownSlots(sr) let numSlotsAdvanced = item.request.count - sq.numAlreadyKnownSlots(sr)
sq.advanceOutput(numSlotsAdvanced) sq.advanceOutput(numSlotsAdvanced)
if hasOkBlock: if goodBlock.isSome():
# If there no error and response was not empty we should reward peer # If there no error and response was not empty we should reward peer
# with some bonus score - not for duplicate blocks though. # with some bonus score - not for duplicate blocks though.
item.request.item.updateScore(PeerScoreGoodBlocks) item.request.item.updateScore(PeerScoreGoodBlocks)
item.request.item.updateStats(SyncResponseKind.Good, 1'u64)
# BlockProcessor reports good block, so we can reward all the peers
# who sent us empty responses.
sq.rewardForGaps(PeerScoreGoodBlocks)
sq.gapList.reset()
else:
# Response was empty
item.request.item.updateStats(SyncResponseKind.Empty, 1'u64)
sq.processGap(item)
if numSlotsAdvanced > 0: if numSlotsAdvanced > 0:
sq.wakeupWaiters() sq.wakeupWaiters()
@ -669,13 +727,13 @@ proc push*[T](sq: SyncQueue[T], sr: SyncRequest[T],
debug "Block pool rejected peer's response", request = item.request, debug "Block pool rejected peer's response", request = item.request,
blocks_map = getShortMap(item.request, item.data), blocks_map = getShortMap(item.request, item.data),
blocks_count = len(item.data), blocks_count = len(item.data),
ok = hasOkBlock, ok = goodBlock.isSome(),
unviable = unviableBlock.isSome(), unviable = unviableBlock.isSome(),
missing_parent = missingParentSlot.isSome() missing_parent = missingParentSlot.isSome()
# We need to move failed response to the debts queue. # We need to move failed response to the debts queue.
sq.toDebtsQueue(item.request) sq.toDebtsQueue(item.request)
if unviableBlock.isSome: if unviableBlock.isSome():
let req = item.request let req = item.request
notice "Received blocks from an unviable fork", request = req, notice "Received blocks from an unviable fork", request = req,
blockRoot = unviableBlock.get()[0], blockRoot = unviableBlock.get()[0],
@ -684,33 +742,53 @@ proc push*[T](sq: SyncQueue[T], sr: SyncRequest[T],
blocks_map = getShortMap(req, item.data) blocks_map = getShortMap(req, item.data)
req.item.updateScore(PeerScoreUnviableFork) req.item.updateScore(PeerScoreUnviableFork)
if missingParentSlot.isSome: if missingParentSlot.isSome():
var var
resetSlot: Option[Slot] resetSlot: Option[Slot]
failSlot = missingParentSlot.get() failSlot = missingParentSlot.get()
# If we got `BlockError.MissingParent` it means that peer returns chain # If we got `BlockError.MissingParent` it means that peer returns chain
# of blocks with holes or `block_pool` is in incomplete state. We going # of blocks with holes or `block_pool` is in incomplete state. We going
# to rewind to the first slot at latest finalized epoch. # to rewind the SyncQueue some distance back (2ⁿ, where n∈[0,∞], but
# no more than `finalized_epoch`).
let let
req = item.request req = item.request
safeSlot = sq.getSafeSlot() safeSlot = sq.getSafeSlot()
gapsCount = len(sq.gapList)
# We should penalize all the peers which responded with gaps.
sq.rewardForGaps(PeerScoreMissingBlocks)
sq.gapList.reset()
case sq.kind case sq.kind
of SyncQueueKind.Forward: of SyncQueueKind.Forward:
if safeSlot < failSlot: if goodBlock.isSome():
# `BlockError.MissingParent` and `Success` present in response,
# it means that we just need to request this range one more time.
debug "Unexpected missing parent, but no rewind needed",
request = req, finalized_slot = safeSlot,
last_good_slot = goodBlock.get(),
missing_parent_slot = missingParentSlot.get(),
blocks_count = len(item.data),
blocks_map = getShortMap(req, item.data),
gaps_count = gapsCount
req.item.updateScore(PeerScoreMissingBlocks)
else:
if safeSlot < req.slot:
let rewindSlot = sq.getRewindPoint(failSlot, safeSlot) let rewindSlot = sq.getRewindPoint(failSlot, safeSlot)
debug "Unexpected missing parent, rewind happens", debug "Unexpected missing parent, rewind happens",
request = req, rewind_to_slot = rewindSlot, request = req, rewind_to_slot = rewindSlot,
rewind_point = sq.rewind, finalized_slot = safeSlot, rewind_point = sq.rewind, finalized_slot = safeSlot,
blocks_count = len(item.data), blocks_count = len(item.data),
blocks_map = getShortMap(req, item.data) blocks_map = getShortMap(req, item.data),
gaps_count = gapsCount
resetSlot = some(rewindSlot) resetSlot = some(rewindSlot)
req.item.updateScore(PeerScoreMissingBlocks)
else: else:
error "Unexpected missing parent at finalized epoch slot", error "Unexpected missing parent at finalized epoch slot",
request = req, rewind_to_slot = safeSlot, request = req, rewind_to_slot = safeSlot,
blocks_count = len(item.data), blocks_count = len(item.data),
blocks_map = getShortMap(req, item.data) blocks_map = getShortMap(req, item.data),
gaps_count = gapsCount
req.item.updateScore(PeerScoreBadBlocks) req.item.updateScore(PeerScoreBadBlocks)
of SyncQueueKind.Backward: of SyncQueueKind.Backward:
if safeSlot > failSlot: if safeSlot > failSlot:

View File

@ -28,6 +28,12 @@ template shortLog(peer: SomeTPeer): string =
proc updateScore(peer: SomeTPeer, score: int) = proc updateScore(peer: SomeTPeer, score: int) =
peer[].score += score peer[].score += score
proc updateStats(peer: SomeTPeer, index: SyncResponseKind, score: uint64) =
discard
proc getStats(peer: SomeTPeer, index: SyncResponseKind): uint64 =
0
func getStaticSlotCb(slot: Slot): GetSlotCallback = func getStaticSlotCb(slot: Slot): GetSlotCallback =
proc getSlot(): Slot = proc getSlot(): Slot =
slot slot