Harden handling of unviable forks (#3312)

* Harden handling of unviable forks

In our current handling of unviable forks, we allow peers to send us
blocks that come from a different fork - this is not necessarily an
error as it can happen naturally, but it does open up the client to a
case where the same unviable fork keeps getting requested - rather than
allowing this to happen, we'll now give these peers a small negative
score - if it keeps happening, we'll disconnect them.

* keep track of unviable forks in quarantine, to avoid filling it with
known junk
* collect peer scores in single module
* descore peers when they send unviable blocks during sync
* don't give score for duplicate blocks
* increase quarantine size to a level that allows finality to happen
under optimal conditions - this helps avoid downloading the same blocks
over and over in case of an unviable fork
* increase initial score for new peers to make room for one more failure
before disconnection
* log and score invalid/unviable blocks in requestmanager too
* avoid ChainDAG dependency in quarantine
* reject gossip blocks with unviable parent
* continue processing unviable sync blocks in order to build unviable
dag

* docs

* Update beacon_chain/consensus_object_pools/block_pools_types.nim

* add unviable queue test
This commit is contained in:
Jacek Sieka 2022-01-26 13:20:08 +01:00 committed by GitHub
parent bd0a3a9b10
commit f70aceef37
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 472 additions and 200 deletions

View File

@ -74,6 +74,11 @@ OK: 6/6 Fail: 0/6 Skip: 0/6
+ Reverse order block add & get [Preset: mainnet] OK
```
OK: 1/1 Fail: 0/1 Skip: 0/1
## Block quarantine
```diff
+ Unviable smoke test OK
```
OK: 1/1 Fail: 0/1 Skip: 0/1
## BlockId and helpers
```diff
+ atSlot sanity OK
@ -361,6 +366,7 @@ OK: 2/2 Fail: 0/2 Skip: 0/2
OK: 4/4 Fail: 0/4 Skip: 0/4
## SyncManager test suite
```diff
+ Process all unviable blocks OK
+ [SyncQueue#Backward] Async unordered push test OK
+ [SyncQueue#Backward] Async unordered push with rewind test OK
+ [SyncQueue#Backward] Pass through established limits test OK
@ -380,7 +386,7 @@ OK: 4/4 Fail: 0/4 Skip: 0/4
+ [SyncQueue] getLastNonEmptySlot() test OK
+ [SyncQueue] hasEndGap() test OK
```
OK: 18/18 Fail: 0/18 Skip: 0/18
OK: 19/19 Fail: 0/19 Skip: 0/19
## Zero signature sanity checks
```diff
+ SSZ serialization roundtrip of SignedBeaconBlockHeader OK
@ -451,4 +457,4 @@ OK: 1/1 Fail: 0/1 Skip: 0/1
OK: 1/1 Fail: 0/1 Skip: 0/1
---TOTAL---
OK: 249/253 Fail: 0/253 Skip: 4/253
OK: 251/255 Fail: 0/255 Skip: 4/255

View File

@ -38,8 +38,8 @@ type
## appears or be discarded if finality obsoletes it
UnviableFork
## Block is from a different history / fork than the one we're interested
## in (based on our finalized checkpoint)
## Block is from a history / fork that does not include our most current
## finalized checkpoint
Duplicate
## We've seen this block already, can't add again
@ -53,9 +53,6 @@ type
OnFinalizedCallback* =
proc(data: FinalizationInfoObject) {.gcsafe, raises: [Defect].}
FetchRecord* = object
root*: Eth2Digest
KeyedBlockRef* = object
# Special wrapper for BlockRef used in ChainDAG.blocks that allows lookup
# by root without keeping a Table that keeps a separate copy of the digest

View File

@ -1,5 +1,5 @@
# beacon_chain
# Copyright (c) 2018-2021 Status Research & Development GmbH
# Copyright (c) 2018-2022 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
@ -9,20 +9,26 @@
import
std/[tables],
chronicles,
stew/bitops2,
../spec/forks,
./block_pools_types
../spec/forks
export tables, forks, block_pools_types
export tables, forks
const
MaxMissingItems = 1024
## Arbitrary
MaxOrphans = SLOTS_PER_EPOCH * 3
## Enough for finalization in an alternative fork
MaxUnviables = 16 * 1024
## About a day of blocks - most likely not needed but it's quite cheap..
type
MissingBlock* = object
tries*: int
FetchRecord* = object
root*: Eth2Digest
Quarantine* = object
## Keeps track of unvalidated blocks coming from the network
## and that cannot yet be added to the chain
@ -32,18 +38,29 @@ type
##
## Trivially invalid blocks may be dropped before reaching this stage.
orphans*: Table[(Eth2Digest, ValidatorSig), ForkedSignedBeaconBlock] ##\
## Blocks that we don't have a parent for - when we resolve the parent, we
## can proceed to resolving the block as well - we index this by root and
## signature such that a block with invalid signature won't cause a block
## with a valid signature to be dropped
orphans*: Table[(Eth2Digest, ValidatorSig), ForkedSignedBeaconBlock]
## Blocks that we don't have a parent for - when we resolve the parent, we
## can proceed to resolving the block as well - we index this by root and
## signature such that a block with invalid signature won't cause a block
## with a valid signature to be dropped
missing*: Table[Eth2Digest, MissingBlock] ##\
## Roots of blocks that we would like to have (either parent_root of
## unresolved blocks or block roots of attestations)
unviable*: OrderedTable[Eth2Digest, tuple[]]
## Unviable blocks are those that come from a history that does not
## include the finalized checkpoint we're currently following, and can
## therefore never be included in our canonical chain - we keep their hash
## around so that we can avoid cluttering the orphans table with their
## descendants - the ChainDAG only keeps track blocks that make up the
## valid and canonical history.
##
## Entries are evicted in FIFO order - recent entries are more likely to
## appear again in attestations and blocks - however, the unviable block
## table is not a complete directory of all unviable blocks circulating -
## only those we have observed, been able to verify as unviable and fit
## in this cache.
logScope:
topics = "quarant"
missing*: Table[Eth2Digest, MissingBlock]
## Roots of blocks that we would like to have (either parent_root of
## unresolved blocks or block roots of attestations)
func init*(T: type Quarantine): T =
T()
@ -83,70 +100,112 @@ func addMissing*(quarantine: var Quarantine, root: Eth2Digest) =
if quarantine.missing.len >= MaxMissingItems:
return
if root in quarantine.unviable:
# Won't get anywhere with this block
return
# It's not really missing if we're keeping it in the quarantine
if (not anyIt(quarantine.orphans.keys, it[0] == root)):
# If the block is in orphans, we no longer need it
discard quarantine.missing.hasKeyOrPut(root, MissingBlock())
if anyIt(quarantine.orphans.keys, it[0] == root):
return
# Add if it's not there, but don't update missing counter
discard quarantine.missing.hasKeyOrPut(root, MissingBlock())
func removeOrphan*(
quarantine: var Quarantine, signedBlock: ForkySignedBeaconBlock) =
quarantine.orphans.del((signedBlock.root, signedBlock.signature))
func isViableOrphan(
dag: ChainDAGRef, signedBlock: ForkedSignedBeaconBlock): bool =
finalizedSlot: Slot, signedBlock: ForkedSignedBeaconBlock): bool =
# The orphan must be newer than the finalization point so that its parent
# either is the finalized block or more recent
let slot = withBlck(signedBlock): blck.message.slot
slot > dag.finalizedHead.slot
let
slot = getForkedBlockField(signedBlock, slot)
slot > finalizedSlot
func removeOldBlocks(quarantine: var Quarantine, dag: ChainDAGRef) =
var oldBlocks: seq[(Eth2Digest, ValidatorSig)]
func cleanupUnviable(quarantine: var Quarantine) =
while quarantine.unviable.len() >= MaxUnviables:
var toDel: Eth2Digest
for k in quarantine.unviable.keys():
toDel = k
break # Cannot modify while for-looping
quarantine.unviable.del(toDel)
template removeNonviableOrphans(orphans: untyped) =
for k, v in orphans.pairs():
if not isViableOrphan(dag, v):
oldBlocks.add k
func addUnviable*(quarantine: var Quarantine, root: Eth2Digest) =
if root in quarantine.unviable:
return
for k in oldBlocks:
orphans.del k
quarantine.cleanupUnviable()
removeNonviableOrphans(quarantine.orphans)
# Remove the tree of orphans whose ancestor is unviable - they are now also
# unviable! This helps avoiding junk in the quarantine, because we don't keep
# unviable parents in the DAG and there's no way to tell an orphan from an
# unviable block without the parent.
var
toRemove: seq[(Eth2Digest, ValidatorSig)] # Can't modify while iterating
toCheck = @[root]
while toCheck.len > 0:
let root = toCheck.pop()
for k, v in quarantine.orphans.mpairs():
if getForkedBlockField(v, parent_root) == root:
toCheck.add(k[0])
toRemove.add(k)
elif k[0] == root:
toRemove.add(k)
for k in toRemove:
quarantine.orphans.del k
quarantine.unviable.add(k[0], ())
toRemove.setLen(0)
quarantine.unviable.add(root, ())
func cleanupOrphans(quarantine: var Quarantine, finalizedSlot: Slot) =
var toDel: seq[(Eth2Digest, ValidatorSig)]
for k, v in quarantine.orphans.pairs():
if not isViableOrphan(finalizedSlot, v):
toDel.add k
for k in toDel:
quarantine.addUnviable k[0]
func clearQuarantine*(quarantine: var Quarantine) =
quarantine.orphans.clear()
quarantine.missing.clear()
quarantine = Quarantine()
# Typically, blocks will arrive in mostly topological order, with some
# out-of-order block pairs. Therefore, it is unhelpful to use either a
# FIFO or LIFO discpline, and since by definition each block gets used
# either 0 or 1 times it's not a cache either. Instead, stop accepting
# new blocks, and rely on syncing to cache up again if necessary. When
# using forward sync, blocks only arrive in an order not requiring the
# quarantine.
# new blocks, and rely on syncing to cache up again if necessary.
#
# For typical use cases, this need not be large, as they're two or three
# blocks arriving out of order due to variable network delays. As blocks
# for future slots are rejected before reaching quarantine, this usually
# will be a block for the last couple of slots for which the parent is a
# likely imminent arrival.
# Since we start forward sync when about one epoch is missing, that's as
# good a number as any.
const MAX_QUARANTINE_ORPHANS = SLOTS_PER_EPOCH
func add*(quarantine: var Quarantine, dag: ChainDAGRef,
signedBlock: ForkedSignedBeaconBlock): bool =
func addOrphan*(
quarantine: var Quarantine, finalizedSlot: Slot,
signedBlock: ForkedSignedBeaconBlock): bool =
## Adds block to quarantine's `orphans` and `missing` lists.
if not isViableOrphan(dag, signedBlock):
if not isViableOrphan(finalizedSlot, signedBlock):
quarantine.addUnviable(signedBlock.root)
return false
quarantine.removeOldBlocks(dag)
quarantine.cleanupOrphans(finalizedSlot)
let parent_root = getForkedBlockField(signedBlock, parent_root)
if parent_root in quarantine.unviable:
quarantine.unviable.add(signedBlock.root, ())
return true
# Even if the quarantine is full, we need to schedule its parent for
# downloading or we'll never get to the bottom of things
withBlck(signedBlock): quarantine.addMissing(blck.message.parent_root)
quarantine.addMissing(parent_root)
if quarantine.orphans.lenu64 >= MAX_QUARANTINE_ORPHANS:
if quarantine.orphans.lenu64 >= MaxOrphans:
return false
quarantine.orphans[(signedBlock.root, signedBlock.signature)] =
@ -164,7 +223,7 @@ iterator pop*(quarantine: var Quarantine, root: Eth2Digest):
for k in toRemove:
quarantine.orphans.del k
for k, v in quarantine.orphans:
for k, v in quarantine.orphans.mpairs():
if getForkedBlockField(v, parent_root) == root:
toRemove.add(k)
yield v

View File

@ -63,13 +63,13 @@ type
# Producers
# ----------------------------------------------------------------
blockQueue*: AsyncQueue[BlockEntry] # Exported for "test_sync_manager"
blockQueue: AsyncQueue[BlockEntry]
# Consumer
# ----------------------------------------------------------------
consensusManager: ref ConsensusManager
validatorMonitor: ref ValidatorMonitor
## Blockchain DAG, AttestationPool and Quarantine
validatorMonitor: ref ValidatorMonitor
getBeaconTime: GetBeaconTimeFn
verifier: BatchVerifier
@ -101,40 +101,6 @@ proc new*(T: type BlockProcessor,
proc hasBlocks*(self: BlockProcessor): bool =
self.blockQueue.len() > 0
# Enqueue
# ------------------------------------------------------------------------------
proc addBlock*(
self: var BlockProcessor, src: MsgSource, blck: ForkedSignedBeaconBlock,
resfut: Future[Result[void, BlockError]] = nil,
validationDur = Duration()) =
## Enqueue a Gossip-validated block for consensus verification
# Backpressure:
# There is no backpressure here - producers must wait for `resfut` to
# constrain their own processing
# Producers:
# - Gossip (when synced)
# - SyncManager (during sync)
# - RequestManager (missing ancestor blocks)
withBlck(blck):
if blck.message.slot <= self.consensusManager.dag.finalizedHead.slot:
# let backfill blocks skip the queue - these are always "fast" to process
# because there are no state rewinds to deal with
let res = self.consensusManager.dag.addBackfillBlock(blck)
if resFut != nil:
resFut.complete(res)
return
try:
self.blockQueue.addLastNoWait(BlockEntry(
blck: blck,
resfut: resfut, queueTick: Moment.now(),
validationDur: validationDur,
src: src))
except AsyncQueueFullError:
raiseAssert "unbounded queue"
# Storage
# ------------------------------------------------------------------------------
@ -143,7 +109,7 @@ proc dumpInvalidBlock*(
if self.dumpEnabled:
dump(self.dumpDirInvalid, signedBlock)
proc dumpBlock*[T](
proc dumpBlock[T](
self: BlockProcessor,
signedBlock: ForkySignedBeaconBlock,
res: Result[T, BlockError]) =
@ -156,6 +122,32 @@ proc dumpBlock*[T](
else:
discard
proc storeBackfillBlock(
self: var BlockProcessor,
signedBlock: ForkySignedBeaconBlock): Result[void, BlockError] =
# The block is certainly not missing any more
self.consensusManager.quarantine[].missing.del(signedBlock.root)
let res = self.consensusManager.dag.addBackfillBlock(signedBlock)
if res.isErr():
case res.error
of BlockError.MissingParent:
if signedBlock.message.parent_root in
self.consensusManager.quarantine[].unviable:
# DAG doesn't know about unviable ancestor blocks - we do! Translate
# this to the appropriate error so that sync etc doesn't retry the block
self.consensusManager.quarantine[].addUnviable(signedBlock.root)
return err(BlockError.UnviableFork)
of BlockError.UnviableFork:
# Track unviables so that descendants can be discarded properly
self.consensusManager.quarantine[].addUnviable(signedBlock.root)
else: discard
res
proc storeBlock*(
self: var BlockProcessor,
src: MsgSource, wallTime: BeaconTime,
@ -213,13 +205,26 @@ proc storeBlock*(
# However this block was before the last finalized epoch and so its parent
# was pruned from the ForkChoice.
if blck.isErr():
if blck.error() == BlockError.MissingParent:
if not self.consensusManager.quarantine[].add(
dag, ForkedSignedBeaconBlock.init(signedBlock)):
case blck.error()
of BlockError.MissingParent:
if signedBlock.message.parent_root in
self.consensusManager.quarantine[].unviable:
# DAG doesn't know about unviable ancestor blocks - we do! Translate
# this to the appropriate error so that sync etc doesn't retry the block
self.consensusManager.quarantine[].addUnviable(signedBlock.root)
return err(BlockError.UnviableFork)
if not self.consensusManager.quarantine[].addOrphan(
dag.finalizedHead.slot, ForkedSignedBeaconBlock.init(signedBlock)):
debug "Block quarantine full",
blockRoot = shortLog(signedBlock.root),
blck = shortLog(signedBlock.message),
signature = shortLog(signedBlock.signature)
of BlockError.UnviableFork:
# Track unviables so that descendants can be discarded properly
self.consensusManager.quarantine[].addUnviable(signedBlock.root)
else: discard
return blck
@ -247,6 +252,41 @@ proc storeBlock*(
blck
# Enqueue
# ------------------------------------------------------------------------------
proc addBlock*(
self: var BlockProcessor, src: MsgSource, blck: ForkedSignedBeaconBlock,
resfut: Future[Result[void, BlockError]] = nil,
validationDur = Duration()) =
## Enqueue a Gossip-validated block for consensus verification
# Backpressure:
# There is no backpressure here - producers must wait for `resfut` to
# constrain their own processing
# Producers:
# - Gossip (when synced)
# - SyncManager (during sync)
# - RequestManager (missing ancestor blocks)
withBlck(blck):
if blck.message.slot <= self.consensusManager.dag.finalizedHead.slot:
# let backfill blocks skip the queue - these are always "fast" to process
# because there are no state rewinds to deal with
let res = self.storeBackfillBlock(blck)
if resFut != nil:
resFut.complete(res)
return
try:
self.blockQueue.addLastNoWait(BlockEntry(
blck: blck,
resfut: resfut, queueTick: Moment.now(),
validationDur: validationDur,
src: src))
except AsyncQueueFullError:
raiseAssert "unbounded queue"
# Event Loop
# ------------------------------------------------------------------------------

View File

@ -228,6 +228,7 @@ template validateBeaconBlockBellatrix(
compute_timestamp_at_slot(state.data, signed_beacon_block.message.slot)
if not (signed_beacon_block.message.body.execution_payload.timestamp ==
timestampAtSlot):
quarantine[].addUnviable(signed_beacon_block.root)
return errReject("BeaconBlock: Mismatched execution payload timestamp")
# https://github.com/ethereum/consensus-specs/blob/v1.0.1/specs/phase0/p2p-interface.md#beacon_block
@ -306,9 +307,15 @@ proc validateBeaconBlock*(
# And implicitly:
# [REJECT] The block's parent (defined by block.parent_root) passes validation.
let parent = dag.getBlockRef(signed_beacon_block.message.parent_root).valueOr:
if signed_beacon_block.message.parent_root in quarantine[].unviable:
quarantine[].addUnviable(signed_beacon_block.root)
return errReject("BeaconBlock: parent from unviable fork")
# When the parent is missing, we can't validate the block - we'll queue it
# in the quarantine for later processing
if not quarantine[].add(dag, ForkedSignedBeaconBlock.init(signed_beacon_block)):
if not quarantine[].addOrphan(
dag.finalizedHead.slot,
ForkedSignedBeaconBlock.init(signed_beacon_block)):
debug "Block quarantine full"
return errIgnore("BeaconBlock: Parent not found")
@ -328,6 +335,8 @@ proc validateBeaconBlock*(
return errIgnore("BeaconBlock: Can't find ancestor")
if not (finalized_checkpoint.root in [ancestor.root, Eth2Digest()]):
quarantine[].addUnviable(signed_beacon_block.root)
return errReject("BeaconBlock: Finalized checkpoint not an ancestor")
# [REJECT] The block is proposed by the expected proposer_index for the
@ -344,6 +353,8 @@ proc validateBeaconBlock*(
return errIgnore("BeaconBlock: Cannot compute proposer") # internal issue
if uint64(proposer.get()) != signed_beacon_block.message.proposer_index:
quarantine[].addUnviable(signed_beacon_block.root)
return errReject("BeaconBlock: Unexpected proposer proposer")
# [REJECT] The proposer signature, signed_beacon_block.signature, is valid
@ -355,6 +366,8 @@ proc validateBeaconBlock*(
signed_beacon_block.root,
dag.validatorKey(proposer.get()).get(),
signed_beacon_block.signature):
quarantine[].addUnviable(signed_beacon_block.root)
return errReject("BeaconBlock: Invalid proposer signature")
validateBeaconBlockBellatrix(signed_beacon_block, parent)

View File

@ -30,14 +30,15 @@ import
../spec/datatypes/[phase0, altair, bellatrix],
../spec/[eth2_ssz_serialization, network, helpers, forks],
../validators/keystore_management,
./eth2_discovery, ./peer_pool, ./libp2p_json_serialization
"."/[eth2_discovery, libp2p_json_serialization, peer_pool, peer_scores]
when chronicles.enabledLogLevel == LogLevel.TRACE:
import std/sequtils
export
tables, version, multiaddress, peer_pool, peerinfo, p2pProtocol, connection,
libp2p_json_serialization, eth2_ssz_serialization, results, eth2_discovery
tables, version, multiaddress, peerinfo, p2pProtocol, connection,
libp2p_json_serialization, eth2_ssz_serialization, results, eth2_discovery,
peer_pool, peer_scores
logScope:
topics = "networking"
@ -216,15 +217,6 @@ const
clientId* = "Nimbus beacon node " & fullVersionStr
nodeMetadataFilename = "node-metadata.json"
NewPeerScore = 200
## Score which will be assigned to new connected Peer
PeerScoreLowLimit = 0
## Score after which peer will be kicked
PeerScoreHighLimit = 1000
## Max value of peer's score
PeerScoreInvalidRequest = -500
## This peer is sending malformed or nonsensical data
ConcurrentConnections = 20
## Maximum number of active concurrent connection requests.

View File

@ -8,6 +8,15 @@
{.push raises: [Defect].}
const
NewPeerScore* = 300
## Score which will be assigned to new connected Peer
PeerScoreLowLimit* = 0
## Score after which peer will be kicked
PeerScoreHighLimit* = 1000
## Max value of peer's score
PeerScoreInvalidRequest* = -500
## This peer is sending malformed or nonsensical data
PeerScoreHeadTooNew* = -100
## The peer reports a head newer than our wall clock slot
PeerScoreNoStatus* = -100
@ -26,5 +35,11 @@ const
## Peer's response contains incorrect blocks.
PeerScoreBadResponse* = -1000
## Peer's response is not in requested range.
PeerScoreMissingBlocks* = -200
## Peer response contains too many empty blocks.
PeerScoreMissingBlocks* = -25
## Peer response contains too many empty blocks - this can happen either
## because a long reorg happened or the peer is falsely trying to convince
## us that a long reorg happened.
## Peer's `blocksByRange` answer is fine.
PeerScoreUnviableFork* = -200
## Peer responded with blocks from an unviable fork - are they on a
## different chain?

View File

@ -7,12 +7,13 @@
{.push raises: [Defect].}
import options, sequtils, strutils
import std/[sequtils, strutils]
import chronos, chronicles
import
../spec/datatypes/[phase0, altair],
../spec/datatypes/[phase0],
../spec/forks,
../networking/eth2_network,
../consensus_object_pools/block_quarantine,
"."/sync_protocol, "."/sync_manager
export sync_manager
@ -82,37 +83,49 @@ proc fetchAncestorBlocksFromNetwork(rman: RequestManager,
if blocks.isOk:
let ublocks = blocks.get()
if checkResponse(items, ublocks):
var res: Result[void, BlockError]
if len(ublocks) > 0:
for b in ublocks:
res = await rman.blockVerifier(b)
if res.isErr():
case res.error()
of BlockError.MissingParent:
# Ignoring because the order of the blocks that
# we requested may be different from the order in which we need
# these blocks to apply.
discard
of BlockError.Duplicate, BlockError.UnviableFork:
# Ignoring because these errors could occur due to the
# concurrent/parallel requests we made.
discard
of BlockError.Invalid:
# We stop processing blocks further to avoid DoS attack with big
# chunk of incorrect blocks.
break
else:
res = Result[void, BlockError].ok()
var
gotGoodBlock = false
gotUnviableBlock = false
for b in ublocks:
let ver = await rman.blockVerifier(b)
if ver.isErr():
case ver.error()
of BlockError.MissingParent:
# Ignoring because the order of the blocks that
# we requested may be different from the order in which we need
# these blocks to apply.
discard
of BlockError.Duplicate:
# Ignoring because these errors could occur due to the
# concurrent/parallel requests we made.
discard
of BlockError.UnviableFork:
# If they're working a different fork, we'll want to descore them
# but also process the other blocks (in case we can register the
# other blocks as unviable)
gotUnviableBlock = true
of BlockError.Invalid:
# We stop processing blocks because peer is either sending us
# junk or working a different fork
warn "Received invalid block",
peer = peer, blocks = shortLog(items),
peer_score = peer.getScore()
peer.updateScore(PeerScoreBadBlocks)
return # Stop processing this junk...
else:
gotGoodBlock = true
if gotUnviableBlock:
notice "Received blocks from an unviable fork",
peer = peer, blocks = shortLog(items),
peer_score = peer.getScore()
peer.updateScore(PeerScoreUnviableFork)
elif gotGoodBlock:
# We reward peer only if it returns something.
peer.updateScore(PeerScoreGoodBlocks)
if res.isOk():
if len(ublocks) > 0:
# We reward peer only if it returns something.
peer.updateScore(PeerScoreGoodBlocks)
else:
# We are not penalizing other errors because of the reasons described
# above.
if res.error == BlockError.Invalid:
peer.updateScore(PeerScoreBadBlocks)
else:
peer.updateScore(PeerScoreBadResponse)
else:

View File

@ -13,9 +13,9 @@ import
../spec/datatypes/[phase0, altair],
../spec/eth2_apis/rpc_types,
../spec/[helpers, forks],
../networking/[peer_pool, eth2_network],
../networking/[peer_pool, peer_scores, eth2_network],
../beacon_clock,
./peer_scores, ./sync_queue
./sync_queue
export phase0, altair, merge, chronos, chronicles, results,
helpers, peer_scores, sync_queue, forks

View File

@ -15,11 +15,10 @@ import
../spec/[helpers, forks],
../networking/[peer_pool, eth2_network],
../gossip_processing/block_processor,
../consensus_object_pools/block_pools_types,
./peer_scores
../consensus_object_pools/block_pools_types
export base, phase0, altair, merge, chronos, chronicles, results,
block_pools_types, helpers, peer_scores
block_pools_types, helpers
logScope:
topics = "syncqueue"
@ -63,7 +62,6 @@ type
chunkSize*: uint64
queueSize*: int
counter*: uint64
opcounter*: uint64
pending*: Table[uint64, SyncRequest[T]]
waiters: seq[SyncWaiter]
getSafeSlot*: GetSlotCallback
@ -570,33 +568,59 @@ proc push*[T](sq: SyncQueue[T], sr: SyncRequest[T],
if processingCb != nil:
processingCb()
template isOkResponse(res: auto): bool =
res.isOk() or res.error in {BlockError.Duplicate, BlockError.UnviableFork}
# Validating received blocks one by one
var res: Result[void, BlockError]
var failSlot: Option[Slot]
if len(item.data) > 0:
for blk in sq.blocks(item):
trace "Pushing block", block_root = blk.root,
block_slot = blk.slot
res = await sq.blockVerifier(blk)
if not res.isOkResponse():
failSlot = some(blk.slot)
var
hasOkBlock = false
hasInvalidBlock = false
unviableBlock: Option[(Eth2Digest, Slot)]
missingParentSlot: Option[Slot]
# compiler segfault if this is moved into the for loop, at time of writing
res: Result[void, BlockError]
for blk in sq.blocks(item):
res = await sq.blockVerifier(blk)
if res.isOk():
hasOkBlock = true
else:
case res.error()
of BlockError.MissingParent:
missingParentSlot = some(blk.slot)
break
else:
res = Result[void, BlockError].ok()
of BlockError.Duplicate:
# Keep going, happens naturally
discard
of BlockError.UnviableFork:
# Keep going so as to register other unviable blocks with the
# quarantine
if unviableBlock.isNone:
# Remember the first unviable block, so we can log it
unviableBlock = some((blk.root, blk.slot))
# Increase progress counter, so watch task will be able to know that we are
# not stuck.
inc(sq.opcounter)
of BlockError.Invalid:
hasInvalidBlock = true
if res.isOkResponse():
let req = item.request
warn "Received invalid sequence of blocks", peer = req.item,
request_slot = req.slot, request_count = req.count,
request_step = req.step, blocks_count = len(item.data),
blocks_map = getShortMap(req, item.data),
direction = req.kind, topics = "syncman"
req.item.updateScore(PeerScoreBadBlocks)
break
# When errors happen while processing blocks, we retry the same request
# with, hopefully, a different peer
let retryRequest =
hasInvalidBlock or unviableBlock.isSome() or missingParentSlot.isSome()
if not retryRequest:
sq.advanceOutput(item.request.count)
if len(item.data) > 0:
if hasOkBlock:
# If there no error and response was not empty we should reward peer
# with some bonus score.
# with some bonus score - not for duplicate blocks though.
item.request.item.updateScore(PeerScoreGoodBlocks)
sq.wakeupWaiters()
else:
debug "Block pool rejected peer's response", peer = item.request.item,
@ -604,13 +628,31 @@ proc push*[T](sq: SyncQueue[T], sr: SyncRequest[T],
request_count = item.request.count,
request_step = item.request.step,
blocks_map = getShortMap(item.request, item.data),
blocks_count = len(item.data), errCode = res.error,
blocks_count = len(item.data),
ok = hasOkBlock,
unviable = unviableBlock.isSome(),
missing_parent = missingParentSlot.isSome(),
direction = item.request.kind, topics = "syncman"
var resetSlot: Option[Slot]
# We need to move failed response to the debts queue.
sq.toDebtsQueue(item.request)
if unviableBlock.isSome:
let req = item.request
notice "Received blocks from an unviable fork",
blockRoot = unviableBlock.get()[0],
blockSlot = unviableBlock.get()[1], peer = req.item,
request_slot = req.slot, request_count = req.count,
request_step = req.step, blocks_count = len(item.data),
blocks_map = getShortMap(req, item.data),
direction = req.kind, topics = "syncman"
req.item.updateScore(PeerScoreUnviableFork)
if missingParentSlot.isSome:
var
resetSlot: Option[Slot]
failSlot = missingParentSlot.get()
case res.error
of BlockError.MissingParent:
# If we got `BlockError.MissingParent` it means that peer returns chain
# of blocks with holes or `block_pool` is in incomplete state. We going
# to rewind to the first slot at latest finalized epoch.
@ -620,11 +662,11 @@ proc push*[T](sq: SyncQueue[T], sr: SyncRequest[T],
case sq.kind
of SyncQueueKind.Forward:
if safeSlot < req.slot:
let rewindSlot = sq.getRewindPoint(failSlot.get(), safeSlot)
let rewindSlot = sq.getRewindPoint(failSlot, safeSlot)
warn "Unexpected missing parent, rewind happens",
peer = req.item, rewind_to_slot = rewindSlot,
rewind_epoch_count = sq.rewind.get().epochCount,
rewind_fail_slot = failSlot.get(),
rewind_fail_slot = failSlot,
finalized_slot = safeSlot,
request_slot = req.slot, request_count = req.count,
request_step = req.step, blocks_count = len(item.data),
@ -642,11 +684,11 @@ proc push*[T](sq: SyncQueue[T], sr: SyncRequest[T],
req.item.updateScore(PeerScoreBadBlocks)
of SyncQueueKind.Backward:
if safeSlot > req.slot:
let rewindSlot = sq.getRewindPoint(failSlot.get(), safeSlot)
let rewindSlot = sq.getRewindPoint(failSlot, safeSlot)
# It's quite common peers give us fewer blocks than we ask for
info "Gap in block range response, rewinding",
peer = req.item, rewind_to_slot = rewindSlot,
rewind_fail_slot = failSlot.get(),
rewind_fail_slot = failSlot,
finalized_slot = safeSlot,
request_slot = req.slot, request_count = req.count,
request_step = req.step, blocks_count = len(item.data),
@ -662,32 +704,21 @@ proc push*[T](sq: SyncQueue[T], sr: SyncRequest[T],
blocks_map = getShortMap(req, item.data),
direction = req.kind, topics = "syncman"
req.item.updateScore(PeerScoreBadBlocks)
of BlockError.Invalid:
let req = item.request
warn "Received invalid sequence of blocks", peer = req.item,
request_slot = req.slot, request_count = req.count,
request_step = req.step, blocks_count = len(item.data),
blocks_map = getShortMap(req, item.data),
direction = req.kind, topics = "syncman"
req.item.updateScore(PeerScoreBadBlocks)
of BlockError.Duplicate, BlockError.UnviableFork:
raiseAssert "Handled above"
# We need to move failed response to the debts queue.
sq.toDebtsQueue(item.request)
if resetSlot.isSome():
await sq.resetWait(resetSlot)
case sq.kind
of SyncQueueKind.Forward:
debug "Rewind to slot was happened", reset_slot = reset_slot.get(),
queue_input_slot = sq.inpSlot, queue_output_slot = sq.outSlot,
rewind_epoch_count = sq.rewind.get().epochCount,
rewind_fail_slot = sq.rewind.get().failSlot,
reset_slot = resetSlot, direction = sq.kind, topics = "syncman"
of SyncQueueKind.Backward:
debug "Rewind to slot was happened", reset_slot = reset_slot.get(),
queue_input_slot = sq.inpSlot, queue_output_slot = sq.outSlot,
reset_slot = resetSlot, direction = sq.kind, topics = "syncman"
if resetSlot.isSome():
await sq.resetWait(resetSlot)
case sq.kind
of SyncQueueKind.Forward:
debug "Rewind to slot was happened", reset_slot = reset_slot.get(),
queue_input_slot = sq.inpSlot, queue_output_slot = sq.outSlot,
rewind_epoch_count = sq.rewind.get().epochCount,
rewind_fail_slot = sq.rewind.get().failSlot,
reset_slot = resetSlot, direction = sq.kind, topics = "syncman"
of SyncQueueKind.Backward:
debug "Rewind to slot was happened", reset_slot = reset_slot.get(),
queue_input_slot = sq.inpSlot, queue_output_slot = sq.outSlot,
reset_slot = resetSlot, direction = sq.kind, topics = "syncman"
break
proc push*[T](sq: SyncQueue[T], sr: SyncRequest[T]) =

View File

@ -18,6 +18,7 @@ import # Unit test
./test_beacon_time,
./test_block_dag,
./test_block_processor,
./test_block_quarantine,
./test_datatypes,
./test_discovery,
./test_eth1_monitor,

View File

@ -0,0 +1,59 @@
# beacon_chain
# Copyright (c) 2022 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.
{.used.}
import
chronicles,
unittest2,
../beacon_chain/spec/forks,
../beacon_chain/spec/datatypes/phase0,
../beacon_chain/consensus_object_pools/block_quarantine
proc makeBlock(slot: Slot, parent: Eth2Digest): ForkedSignedBeaconBlock =
var
b = phase0.SignedBeaconBlock(
message: phase0.BeaconBlock(slot: slot, parent_root: parent))
b.root = hash_tree_root(b.message)
ForkedSignedBeaconBlock.init(b)
suite "Block quarantine":
test "Unviable smoke test":
let
b0 = makeBlock(Slot 0, Eth2Digest())
b1 = makeBlock(Slot 1, b0.root)
b2 = makeBlock(Slot 2, b1.root)
b3 = makeBlock(Slot 3, b2.root)
b4 = makeBlock(Slot 4, b2.root)
var quarantine: Quarantine
quarantine.addMissing(b1.root)
check:
FetchRecord(root: b1.root) in quarantine.checkMissing()
quarantine.addOrphan(Slot 0, b1)
FetchRecord(root: b1.root) notin quarantine.checkMissing()
quarantine.addOrphan(Slot 0, b2)
quarantine.addOrphan(Slot 0, b3)
quarantine.addOrphan(Slot 0, b4)
(b4.root, ValidatorSig()) in quarantine.orphans
quarantine.addUnviable(b4.root)
check:
(b4.root, ValidatorSig()) notin quarantine.orphans
quarantine.addUnviable(b1.root)
check:
(b1.root, ValidatorSig()) notin quarantine.orphans
(b2.root, ValidatorSig()) notin quarantine.orphans
(b3.root, ValidatorSig()) notin quarantine.orphans

View File

@ -552,6 +552,52 @@ suite "SyncManager test suite":
check waitFor(runTest()) == true
test "Process all unviable blocks":
let
aq = newAsyncQueue[BlockEntry]()
startSlot = Slot(0)
chunkSize = SLOTS_PER_EPOCH
numberOfChunks = 1'u64
finishSlot = Slot(startSlot + numberOfChunks * chunkSize - 1'u64)
queueSize = 1
var counter = int(startSlot)
proc forwardValidator(aq: AsyncQueue[BlockEntry]) {.async.} =
while true:
let sblock = await aq.popFirst()
withBlck(sblock.blck):
sblock.fail(BlockError.UnviableFork)
inc(counter)
var
chain = createChain(startSlot, finishSlot)
queue = SyncQueue.init(SomeTPeer, SyncQueueKind.Forward,
startSlot, finishSlot, chunkSize,
getFirstSlotAtFinalizedEpoch, collector(aq),
queueSize)
validatorFut = forwardValidator(aq)
let
p1 = SomeTPeer()
proc runTest(): Future[bool] {.async.} =
var r11 = queue.pop(finishSlot, p1)
# Push a single request that will fail with all blocks being unviable
var f11 = queue.push(r11, chain.getSlice(startSlot, r11))
discard await f11.withTimeout(100.milliseconds)
check:
f11.finished == true
counter == int(startSlot + chunkSize) # should process all unviable blocks
debtLen(queue) == chunkSize # The range must be retried
await validatorFut.cancelAndWait()
return true
check waitFor(runTest()) == true
test "[SyncQueue#Backward] Async unordered push with rewind test":
let
aq = newAsyncQueue[BlockEntry]()