Quarantine and reassembly of gossiped blobs and blocks (#4808)

This commit is contained in:
henridf 2023-04-13 21:11:40 +02:00 committed by GitHub
parent fa3655527c
commit 021de18e06
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 267 additions and 42 deletions

View File

@ -0,0 +1,70 @@
# beacon_chain
# Copyright (c) 2018-2023 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.
{.push raises: [].}
import
std/tables,
../spec/datatypes/deneb
const
MaxBlobs = SLOTS_PER_EPOCH * MAX_BLOBS_PER_BLOCK
type
BlobQuarantine* = object
blobs*: Table[(Eth2Digest, BlobIndex), ref BlobSidecar]
func put*(quarantine: var BlobQuarantine, blobSidecar: ref BlobSidecar) =
if quarantine.blobs.lenu64 > MaxBlobs:
return
discard quarantine.blobs.hasKeyOrPut((blobSidecar.block_root,
blobSidecar.index), blobSidecar)
func blobIndices*(quarantine: BlobQuarantine, digest: Eth2Digest):
seq[BlobIndex] =
var r: seq[BlobIndex] = @[]
for i in 0..MAX_BLOBS_PER_BLOCK-1:
if quarantine.blobs.hasKey((digest, i)):
r.add(i)
r
func hasBlob*(quarantine: BlobQuarantine, blobSidecar: BlobSidecar) : bool =
quarantine.blobs.hasKey((blobSidecar.block_root, blobSidecar.index))
func popBlobs*(quarantine: var BlobQuarantine, digest: Eth2Digest):
seq[ref BlobSidecar] =
var r: seq[ref BlobSidecar] = @[]
for i in 0..MAX_BLOBS_PER_BLOCK-1:
var b: ref BlobSidecar
if quarantine.blobs.pop((digest, i), b):
r.add(b)
r
func peekBlobs*(quarantine: var BlobQuarantine, digest: Eth2Digest):
seq[ref BlobSidecar] =
var r: seq[ref BlobSidecar] = @[]
for i in 0..MAX_BLOBS_PER_BLOCK-1:
quarantine.blobs.withValue((digest, i), value):
r.add(value[])
r
func removeBlobs*(quarantine: var BlobQuarantine, digest: Eth2Digest) =
for i in 0..MAX_BLOBS_PER_BLOCK-1:
quarantine.blobs.del((digest, i))
func hasBlobs*(quarantine: BlobQuarantine, blck: deneb.SignedBeaconBlock):
bool =
let idxs = quarantine.blobIndices(blck.root)
if len(blck.message.body.blob_kzg_commitments) != len(idxs):
return false
for i in 0..len(idxs):
if idxs[i] != uint64(i):
return false
true

View File

@ -19,6 +19,8 @@ const
## Arbitrary
MaxOrphans = SLOTS_PER_EPOCH * 3
## Enough for finalization in an alternative fork
MaxBlobless = SLOTS_PER_EPOCH
## Arbitrary
MaxUnviables = 16 * 1024
## About a day of blocks - most likely not needed but it's quite cheap..
@ -39,10 +41,19 @@ type
## Trivially invalid blocks may be dropped before reaching this stage.
orphans*: Table[(Eth2Digest, ValidatorSig), ForkedSignedBeaconBlock]
## Blocks that we don't have a parent for - when we resolve the parent, we
## can proceed to resolving the block as well - we index this by root and
## signature such that a block with invalid signature won't cause a block
## with a valid signature to be dropped
## Blocks that we don't have a parent for - when we resolve the
## parent, we can proceed to resolving the block as well - we
## index this by root and signature such that a block with
## invalid signature won't cause a block with a valid signature
## to be dropped. An orphan block may also be "blobless" (see
## below) - if so, upon resolving the parent, it should be
## stored in the blobless table.
blobless*: Table[(Eth2Digest, ValidatorSig), deneb.SignedBeaconBlock]
## Blocks that we don't have blobs for. When we have received
## all blobs for this block, we can proceed to resolving the
## block as well. A blobless block inserted into this table must
## have a resolved parent (i.e., it is not an orphan).
unviable*: OrderedTable[Eth2Digest, tuple[]]
## Unviable blocks are those that come from a history that does not
@ -115,12 +126,14 @@ func removeOrphan*(
quarantine: var Quarantine, signedBlock: ForkySignedBeaconBlock) =
quarantine.orphans.del((signedBlock.root, signedBlock.signature))
func isViableOrphan(
finalizedSlot: Slot, signedBlock: ForkedSignedBeaconBlock): bool =
func removeBlobless*(
quarantine: var Quarantine, signedBlock: ForkySignedBeaconBlock) =
quarantine.blobless.del((signedBlock.root, signedBlock.signature))
func isViable(
finalizedSlot: Slot, slot: Slot): bool =
# The orphan must be newer than the finalization point so that its parent
# either is the finalized block or more recent
let
slot = getForkedBlockField(signedBlock, slot)
slot > finalizedSlot
func cleanupUnviable(quarantine: var Quarantine) =
@ -131,47 +144,76 @@ func cleanupUnviable(quarantine: var Quarantine) =
break # Cannot modify while for-looping
quarantine.unviable.del(toDel)
func addUnviable*(quarantine: var Quarantine, root: Eth2Digest) =
if root in quarantine.unviable:
return
quarantine.cleanupUnviable()
func removeUnviableTree[T](quarantine: var Quarantine,
toCheck: var seq[Eth2Digest],
tbl: var Table[(Eth2Digest, ValidatorSig),
T]): seq[Eth2Digest] =
# Remove the tree of orphans whose ancestor is unviable - they are now also
# unviable! This helps avoiding junk in the quarantine, because we don't keep
# unviable parents in the DAG and there's no way to tell an orphan from an
# unviable block without the parent.
var
toRemove: seq[(Eth2Digest, ValidatorSig)] # Can't modify while iterating
toCheck = @[root]
checked: seq[Eth2Digest]
while toCheck.len > 0:
let root = toCheck.pop()
for k, v in quarantine.orphans.mpairs():
if getForkedBlockField(v, parent_root) == root:
if root notin checked:
checked.add(root)
for k, v in tbl.mpairs():
when T is ForkedSignedBeaconBlock:
let blockRoot = getForkedBlockField(v, parent_root)
elif T is ForkySignedBeaconBlock:
let blockRoot = v.message.parent_root
else:
static: doAssert false
if blockRoot == root:
toCheck.add(k[0])
toRemove.add(k)
elif k[0] == root:
toRemove.add(k)
for k in toRemove:
quarantine.orphans.del k
tbl.del k
quarantine.unviable[k[0]] = ()
toRemove.setLen(0)
checked
func addUnviable*(quarantine: var Quarantine, root: Eth2Digest) =
if root in quarantine.unviable:
return
quarantine.cleanupUnviable()
var toCheck = @[root]
var checked = quarantine.removeUnviableTree(toCheck, quarantine.orphans)
discard quarantine.removeUnviableTree(checked, quarantine.blobless)
quarantine.unviable[root] = ()
func cleanupOrphans(quarantine: var Quarantine, finalizedSlot: Slot) =
var toDel: seq[(Eth2Digest, ValidatorSig)]
for k, v in quarantine.orphans:
if not isViableOrphan(finalizedSlot, v):
if not isViable(finalizedSlot, getForkedBlockField(v, slot)):
toDel.add k
for k in toDel:
quarantine.addUnviable k[0]
quarantine.orphans.del k
func cleanupBlobless(quarantine: var Quarantine, finalizedSlot: Slot) =
var toDel: seq[(Eth2Digest, ValidatorSig)]
for k, v in quarantine.blobless:
if not isViable(finalizedSlot, v.message.slot):
toDel.add k
for k in toDel:
quarantine.addUnviable k[0]
quarantine.blobless.del k
func clearAfterReorg*(quarantine: var Quarantine) =
## Clear missing and orphans to start with a fresh slate in case of a reorg
## Unviables remain unviable and are not cleared.
@ -193,7 +235,7 @@ func addOrphan*(
quarantine: var Quarantine, finalizedSlot: Slot,
signedBlock: ForkedSignedBeaconBlock): bool =
## Adds block to quarantine's `orphans` and `missing` lists.
if not isViableOrphan(finalizedSlot, signedBlock):
if not isViable(finalizedSlot, getForkedBlockField(signedBlock, slot)):
quarantine.addUnviable(signedBlock.root)
return false
@ -230,3 +272,26 @@ iterator pop*(quarantine: var Quarantine, root: Eth2Digest):
if getForkedBlockField(v, parent_root) == root:
toRemove.add(k)
yield v
proc addBlobless*(
quarantine: var Quarantine, finalizedSlot: Slot,
signedBlock: deneb.SignedBeaconBlock): bool =
if not isViable(finalizedSlot, signedBlock.message.slot):
quarantine.addUnviable(signedBlock.root)
return false
quarantine.cleanupBlobless(finalizedSlot)
if quarantine.blobless.lenu64 >= MaxBlobless:
return false
quarantine.blobless[(signedBlock.root, signedBlock.signature)] = signedBlock
quarantine.missing.del(signedBlock.root)
true
iterator peekBlobless*(quarantine: var Quarantine, root: Eth2Digest):
deneb.SignedBeaconBlock =
for k, v in quarantine.blobless.mpairs():
if k[0] == root:
yield v

View File

@ -22,7 +22,9 @@ from ../consensus_object_pools/block_dag import BlockRef, root, shortLog, slot
from ../consensus_object_pools/block_pools_types import
EpochRef, VerifierError
from ../consensus_object_pools/block_quarantine import
addOrphan, addUnviable, pop, removeOrphan
addBlobless, addOrphan, addUnviable, pop, removeOrphan
from ../consensus_object_pools/blob_quarantine import
BlobQuarantine, hasBlobs, popBlobs
from ../validators/validator_monitor import
MsgSource, ValidatorMonitor, registerAttestationInBlock, registerBeaconBlock,
registerSyncAggregateInBlock
@ -91,6 +93,7 @@ type
validatorMonitor: ref ValidatorMonitor
getBeaconTime: GetBeaconTimeFn
blobQuarantine: ref BlobQuarantine
verifier: BatchVerifier
lastPayload: Slot
@ -123,6 +126,7 @@ proc new*(T: type BlockProcessor,
rng: ref HmacDrbgContext, taskpool: TaskPoolPtr,
consensusManager: ref ConsensusManager,
validatorMonitor: ref ValidatorMonitor,
blobQuarantine: ref BlobQuarantine,
getBeaconTime: GetBeaconTimeFn): ref BlockProcessor =
(ref BlockProcessor)(
dumpEnabled: dumpEnabled,
@ -131,6 +135,7 @@ proc new*(T: type BlockProcessor,
blockQueue: newAsyncQueue[BlockEntry](),
consensusManager: consensusManager,
validatorMonitor: validatorMonitor,
blobQuarantine: blobQuarantine,
getBeaconTime: getBeaconTime,
verifier: BatchVerifier(rng: rng, taskpool: taskpool)
)
@ -564,7 +569,22 @@ proc storeBlock*(
for quarantined in self.consensusManager.quarantine[].pop(blck.get().root):
# Process the blocks that had the newly accepted block as parent
self[].addBlock(MsgSource.gossip, quarantined, BlobSidecars @[])
withBlck(quarantined):
when typeof(blck).toFork() < ConsensusFork.Deneb:
self[].addBlock(MsgSource.gossip, quarantined, BlobSidecars @[])
else:
if len(blck.message.body.blob_kzg_commitments) == 0:
self[].addBlock(MsgSource.gossip, quarantined, BlobSidecars @[])
else:
if self.blobQuarantine[].hasBlobs(blck):
let blobs = self.blobQuarantine[].popBlobs(blck.root)
self[].addBlock(MsgSource.gossip, quarantined, blobs)
else:
if not self.consensusManager.quarantine[].addBlobless(
dag.finalizedHead.slot, blck):
notice "Block quarantine full (blobless)",
blockRoot = shortLog(quarantined.root),
signature = shortLog(quarantined.signature)
return Result[BlockRef, (VerifierError, ProcessingStatus)].ok blck.get

View File

@ -14,8 +14,8 @@ import
../spec/[helpers, forks],
../spec/datatypes/[altair, phase0, deneb],
../consensus_object_pools/[
block_clearance, block_quarantine, blockchain_dag, exit_pool, attestation_pool,
light_client_pool, sync_committee_msg_pool],
blob_quarantine, block_clearance, block_quarantine, blockchain_dag,
exit_pool, attestation_pool, light_client_pool, sync_committee_msg_pool],
../validators/validator_pool,
../beacon_clock,
"."/[gossip_validation, block_processor, batch_validation],
@ -145,6 +145,8 @@ type
# ----------------------------------------------------------------
quarantine*: ref Quarantine
blobQuarantine*: ref BlobQuarantine
# Application-provided current time provider (to facilitate testing)
getCurrentBeaconTime*: GetBeaconTimeFn
@ -167,6 +169,7 @@ proc new*(T: type Eth2Processor,
syncCommitteeMsgPool: ref SyncCommitteeMsgPool,
lightClientPool: ref LightClientPool,
quarantine: ref Quarantine,
blobQuarantine: ref BlobQuarantine,
rng: ref HmacDrbgContext,
getBeaconTime: GetBeaconTimeFn,
taskpool: TaskPoolPtr
@ -184,6 +187,7 @@ proc new*(T: type Eth2Processor,
syncCommitteeMsgPool: syncCommitteeMsgPool,
lightClientPool: lightClientPool,
quarantine: quarantine,
blobQuarantine: blobQuarantine,
getCurrentBeaconTime: getBeaconTime,
batchCrypto: BatchCrypto.new(
rng = rng,
@ -234,9 +238,21 @@ proc processSignedBeaconBlock*(
# propagation of seemingly good blocks
trace "Block validated"
var blobs: BlobSidecars
when typeof(signedBlock).toFork() >= ConsensusFork.Deneb:
if self.blobQuarantine[].hasBlobs(signedBlock):
blobs = self.blobQuarantine[].popBlobs(signedBlock.root)
else:
if not self.quarantine[].addBlobless(self.dag.finalizedHead.slot,
signedBlock):
notice "Block quarantine full (blobless)",
blockRoot = shortLog(signedBlock.root),
blck = shortLog(signedBlock.message),
signature = shortLog(signedBlock.signature)
self.blockProcessor[].addBlock(
src, ForkedSignedBeaconBlock.init(signedBlock),
BlobSidecars @[],
blobs,
maybeFinalized = maybeFinalized,
validationDur = nanoseconds(
(self.getCurrentBeaconTime() - wallTime).nanoseconds))
@ -268,23 +284,50 @@ proc processSignedBlobSidecar*(
# Potential under/overflows are fine; would just create odd metrics and logs
let delay = wallTime - signedBlobSidecar.message.slot.start_beacon_time
debug "Blob received", delay
if self.blobQuarantine[].hasBlob(signedBlobSidecar.message):
debug "Blob received, already in quarantine", delay
return ValidationRes.ok
else:
debug "Blob received", delay
let v =
self.dag.validateBlobSidecar(self.quarantine, signedBlobSidecar, wallTime, idx)
self.dag.validateBlobSidecar(self.quarantine,
signedBlobSidecar, wallTime, idx)
if v.isOk():
trace "Blob validated"
# TODO
# hand blob off to blob quarantine
blob_sidecars_received.inc()
blob_sidecar_delay.observe(delay.toFloatSeconds())
else:
if v.isErr():
debug "Dropping blob", error = v.error()
blob_sidecars_dropped.inc(1, [$v.error[0]])
return v
debug "Blob validated, putting in blob quarantine"
self.blobQuarantine[].put(newClone(signedBlobSidecar.message))
var toAdd: seq[deneb.SignedBeaconBlock]
var skippedBlocks = false
for blobless in self.quarantine[].peekBlobless(
signedBlobSidecar.message.block_root):
if self.blobQuarantine[].hasBlobs(blobless):
toAdd.add(blobless)
else:
skippedBlocks = true
if len(toAdd) > 0:
let blobs = self.blobQuarantine[].peekBlobs(
signedBlobSidecar.message.block_root)
for blobless in toAdd:
self.blockProcessor[].addBlock(
MsgSource.gossip,
ForkedSignedBeaconBlock.init(blobless), blobs)
self.quarantine[].removeBlobless(blobless)
if not skippedBlocks:
# no blobless blocks remain in quarantine that need these blobs,
# so we can remove them.
self.blobQuarantine[].removeBlobs(toAdd[0].root)
blob_sidecars_received.inc()
blob_sidecar_delay.observe(delay.toFloatSeconds())
v

View File

@ -14,6 +14,7 @@ import
stew/[byteutils, io2],
eth/p2p/discoveryv5/[enr, random2],
eth/keys,
./consensus_object_pools/blob_quarantine,
./consensus_object_pools/vanity_logs/vanity_logs,
./networking/topic_params,
./rpc/[rest_api, state_ttl_cache],
@ -319,6 +320,7 @@ proc initFullNode(
LightClientPool())
validatorChangePool = newClone(
ValidatorChangePool.init(dag, attestationPool, onVoluntaryExitAdded))
blobQuarantine = newClone(BlobQuarantine())
consensusManager = ConsensusManager.new(
dag, attestationPool, quarantine, node.elManager,
ActionTracker.init(rng, config.subscribeAllSubnets),
@ -326,7 +328,8 @@ proc initFullNode(
config.defaultFeeRecipient, config.suggestedGasLimit)
blockProcessor = BlockProcessor.new(
config.dumpEnabled, config.dumpDirInvalid, config.dumpDirIncoming,
rng, taskpool, consensusManager, node.validatorMonitor, getBeaconTime)
rng, taskpool, consensusManager, node.validatorMonitor,
blobQuarantine, getBeaconTime)
blockVerifier =
proc(signedBlock: ForkedSignedBeaconBlock, maybeFinalized: bool):
Future[Result[void, VerifierError]] =
@ -356,7 +359,7 @@ proc initFullNode(
config.doppelgangerDetection,
blockProcessor, node.validatorMonitor, dag, attestationPool,
validatorChangePool, node.attachedValidators, syncCommitteeMsgPool,
lightClientPool, quarantine, rng, getBeaconTime, taskpool)
lightClientPool, quarantine, blobQuarantine, rng, getBeaconTime, taskpool)
syncManager = newSyncManager[Peer, PeerId](
node.network.peerPool, dag.cfg.DENEB_FORK_EPOCH, SyncQueueKind.Forward, getLocalHeadSlot,
getLocalWallSlot, getFirstSlotAtFinalizedEpoch, getBackfillSlot,
@ -1348,6 +1351,9 @@ proc onSecond(node: BeaconNode, time: Moment) =
notice "Shutting down after having reached the target synced epoch"
bnStatus = BeaconNodeStatus.Stopping
# TODO
# onSecond timer to handle missing blobs, similar to above for blocks
proc runOnSecondLoop(node: BeaconNode) {.async.} =
const
sleepTime = chronos.seconds(1)

View File

@ -17,8 +17,8 @@ import
../beacon_chain/spec/datatypes/deneb,
../beacon_chain/gossip_processing/block_processor,
../beacon_chain/consensus_object_pools/[
attestation_pool, blockchain_dag, block_quarantine, block_clearance,
consensus_manager],
attestation_pool, blockchain_dag, blob_quarantine, block_quarantine,
block_clearance, consensus_manager],
../beacon_chain/eth1/eth1_monitor,
./testutil, ./testdbutil, ./testblockutil
@ -41,6 +41,7 @@ suite "Block processor" & preset():
taskpool = Taskpool.new()
verifier = BatchVerifier(rng: keys.newRng(), taskpool: taskpool)
quarantine = newClone(Quarantine.init())
blobQuarantine = newClone(BlobQuarantine())
attestationPool = newClone(AttestationPool.init(dag, quarantine))
elManager = new ELManager # TODO: initialise this properly
actionTracker: ActionTracker
@ -56,7 +57,7 @@ suite "Block processor" & preset():
getTimeFn = proc(): BeaconTime = b2.message.slot.start_beacon_time()
processor = BlockProcessor.new(
false, "", "", keys.newRng(), taskpool, consensusManager,
validatorMonitor, getTimeFn)
validatorMonitor, blobQuarantine, getTimeFn)
asyncTest "Reverse order block add & get" & preset():
let missing = await processor.storeBlock(

View File

@ -20,6 +20,13 @@ func makeBlock(slot: Slot, parent: Eth2Digest): ForkedSignedBeaconBlock =
b.root = hash_tree_root(b.message)
ForkedSignedBeaconBlock.init(b)
func makeBlobbyBlock(slot: Slot, parent: Eth2Digest): deneb.SignedBeaconBlock =
var
b = deneb.SignedBeaconBlock(
message: deneb.BeaconBlock(slot: slot, parent_root: parent))
b.root = hash_tree_root(b.message)
b
suite "Block quarantine":
test "Unviable smoke test":
let
@ -28,6 +35,8 @@ suite "Block quarantine":
b2 = makeBlock(Slot 2, b1.root)
b3 = makeBlock(Slot 3, b2.root)
b4 = makeBlock(Slot 4, b2.root)
b5 = makeBlobbyBlock(Slot 4, b3.root)
b6 = makeBlobbyBlock(Slot 4, b4.root)
var quarantine: Quarantine
@ -43,16 +52,27 @@ suite "Block quarantine":
quarantine.addOrphan(Slot 0, b3)
quarantine.addOrphan(Slot 0, b4)
quarantine.addBlobless(Slot 0, b5)
quarantine.addBlobless(Slot 0, b6)
(b4.root, ValidatorSig()) in quarantine.orphans
(b5.root, ValidatorSig()) in quarantine.blobless
(b6.root, ValidatorSig()) in quarantine.blobless
quarantine.addUnviable(b4.root)
check:
(b4.root, ValidatorSig()) notin quarantine.orphans
(b5.root, ValidatorSig()) in quarantine.blobless
(b6.root, ValidatorSig()) notin quarantine.blobless
quarantine.addUnviable(b1.root)
check:
(b1.root, ValidatorSig()) notin quarantine.orphans
(b2.root, ValidatorSig()) notin quarantine.orphans
(b3.root, ValidatorSig()) notin quarantine.orphans
(b5.root, ValidatorSig()) notin quarantine.blobless
(b6.root, ValidatorSig()) notin quarantine.blobless