fork choice refresh (#1520)

* add attestation processing queue so attestations don't get processed
too early
* rework justified slot delay to match spec / lighthouse better
* keep less state in fork choice
* request epochref less
This commit is contained in:
Jacek Sieka 2020-08-17 20:36:13 +02:00 committed by GitHub
parent 17ca72cf55
commit 79ff4f7c41
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 185 additions and 109 deletions

View File

@ -68,12 +68,14 @@ proc init*(T: type AttestationPool, chainDag: ChainDAGRef, quarantine: Quarantin
forkChoice: forkChoice
)
func processAttestation(
pool: var AttestationPool, participants: HashSet[ValidatorIndex],
block_root: Eth2Digest, target_epoch: Epoch) =
proc processAttestation(
pool: var AttestationPool, slot: Slot, participants: HashSet[ValidatorIndex],
block_root: Eth2Digest, target: Checkpoint, wallSlot: Slot) =
# Add attestation votes to fork choice
for validator in participants:
pool.forkChoice.process_attestation(validator, block_root, target_epoch)
if (let v = pool.forkChoice.on_attestation(
pool.chainDag, slot, block_root, toSeq(participants), target, wallSlot);
v.isErr):
warn "Couldn't process attestation", err = v.error()
func addUnresolved*(pool: var AttestationPool, attestation: Attestation) =
pool.unresolved[attestation.data.beacon_block_root] =
@ -160,7 +162,8 @@ proc addResolved(
a.validations.add(validation)
pool.processAttestation(
participants, a.blck.root, attestation.data.target.epoch)
attestation.data.slot, participants, attestation.data.beacon_block_root,
attestation.data.target, wallSlot)
info "Attestation resolved",
attestation = shortLog(attestation),
@ -178,7 +181,8 @@ proc addResolved(
validations: @[validation]
))
pool.processAttestation(
participants, blck.root, attestation.data.target.epoch)
attestation.data.slot, participants, attestation.data.beacon_block_root,
attestation.data.target, wallSlot)
info "Attestation resolved",
attestation = shortLog(attestation),
@ -346,7 +350,7 @@ proc resolve*(pool: var AttestationPool, wallSlot: Slot) =
pool.addResolved(a.blck, a.attestation, wallSlot)
proc selectHead*(pool: var AttestationPool, wallSlot: Slot): BlockRef =
let newHead = pool.forkChoice.find_head(wallSlot)
let newHead = pool.forkChoice.get_head(pool.chainDag, wallSlot)
if newHead.isErr:
error "Couldn't select head", err = newHead.error

View File

@ -80,11 +80,11 @@ proc initForkChoice*(finalizedState: StateData,
let finalized_epoch = finalizedState.data.data.get_current_epoch()
let ffgCheckpoint = FFGCheckpoints(
justified: BalanceCheckpoint(
blck: finalizedState.blck,
epochRef: epochRef),
finalized: Checkpoint(root: finalizedState.blck.root, epoch: finalized_epoch))
let
justified = BalanceCheckpoint(
blck: finalizedState.blck, epochRef: epochRef)
finalized = Checkpoint(
root: finalizedState.blck.root, epoch: finalized_epoch)
let backend = ? initForkChoiceBackend(
finalized_epoch, finalized_epoch, finalizedState.blck.root)
@ -92,9 +92,10 @@ proc initForkChoice*(finalizedState: StateData,
ok(ForkChoice(
backend: backend,
checkpoints: Checkpoints(
current: ffgCheckpoint,
best: ffgCheckpoint),
finalizedBlock: finalizedState.blck,
justified: justified,
best_justified:
Checkpoint(root: justified.blck.root, epoch: justified.epochRef.epoch),
finalized: finalized)
))
func extend[T](s: var seq[T], minLen: int) =
@ -104,6 +105,38 @@ func extend[T](s: var seq[T], minLen: int) =
if s.len < minLen:
s.setLen(minLen)
proc compute_slots_since_epoch_start(slot: Slot): uint64 =
slot - slot.epoch().compute_start_slot_at_epoch()
proc on_tick(self: var Checkpoints, dag: ChainDAGRef, time: Slot): FcResult[void] =
if self.time > time:
return err(ForkChoiceError(kind: fcInconsistentTick))
let newEpoch = self.time.epoch() != time.epoch()
self.time = time
if newEpoch and
self.best_justified.epoch > self.justified.epochRef.epoch:
let blck = dag.getRef(self.best_justified.root)
if blck.isNil:
return err(ForkChoiceError(
kind: fcJustifiedNodeUnknown, block_root: self.best_justified.root))
self.justified = BalanceCheckpoint(
blck: blck,
epochRef: dag.getEpochRef(blck, self.best_justified.epoch))
ok()
proc process_attestation_queue(self: var ForkChoice) {.gcsafe.}
proc update_time(self: var ForkChoice, dag: ChainDAGRef, time: Slot): FcResult[void] =
while time > self.checkpoints.time:
? on_tick(self.checkpoints, dag, self.checkpoints.time + 1)
self.process_attestation_queue()
ok()
func process_attestation*(
self: var ForkChoiceBackend,
validator_index: ValidatorIndex,
@ -129,13 +162,19 @@ func process_attestation*(
validator_index = validator_index,
new_vote = shortLog(vote)
func process_attestation*(
self: var ForkChoice,
validator_index: ValidatorIndex,
block_root: Eth2Digest,
target_epoch: Epoch
) =
self.backend.process_attestation(validator_index, block_root, target_epoch)
proc process_attestation_queue(self: var ForkChoice) =
var
keep: seq[QueuedAttestation]
for attestation in self.queuedAttestations:
if attestation.slot < self.checkpoints.time:
for validator_index in attestation.attesting_indices:
self.backend.process_attestation(
validator_index, attestation.block_root,
attestation.target_epoch)
else:
keep.add attestation
self.queuedAttestations = keep
func contains*(self: ForkChoiceBackend, block_root: Eth2Digest): bool =
## Returns `true` if a block is known to the fork choice
@ -144,10 +183,64 @@ func contains*(self: ForkChoiceBackend, block_root: Eth2Digest): bool =
## In particular, before adding a block, its parent must be known to the fork choice
self.proto_array.indices.contains(block_root)
# https://github.com/ethereum/eth2.0-specs/blob/v0.12.1/specs/phase0/fork-choice.md#on_attestation
proc on_attestation*(
self: var ForkChoice,
dag: ChainDAGRef,
slot: Slot,
beacon_block_root: Eth2Digest,
attesting_indices: openArray[ValidatorIndex],
target: Checkpoint,
wallSlot: Slot
): FcResult[void] =
? self.update_time(dag, wallSlot)
if beacon_block_root == Eth2Digest():
return ok()
if slot < self.checkpoints.time:
for validator_index in attesting_indices:
self.backend.process_attestation(
validator_index, beacon_block_root, target.epoch)
else:
self.queued_attestations.add(QueuedAttestation(
slot: slot,
attesting_indices: @attesting_indices,
block_root: beacon_block_root,
target_epoch: target.epoch))
ok()
# https://github.com/ethereum/eth2.0-specs/blob/v0.12.1/specs/phase0/fork-choice.md#should_update_justified_checkpoint
proc should_update_justified_checkpoint(
self: var Checkpoints,
dag: ChainDAGRef,
epochRef: EpochRef): FcResult[bool] =
if compute_slots_since_epoch_start(self.time) < SAFE_SLOTS_TO_UPDATE_JUSTIFIED:
return ok(true)
let
justified_slot = compute_start_slot_at_epoch(self.justified.epochRef.epoch)
let new_justified_checkpoint = epochRef.current_justified_checkpoint;
let justified_blck = dag.getRef(new_justified_checkpoint.root)
if justified_blck.isNil:
return err(ForkChoiceError(
kind: fcJustifiedNodeUnknown, block_root: new_justified_checkpoint.root))
let justified_ancestor =
justified_blck.atSlot(justified_slot)
if justified_ancestor.blck.root != self.justified.blck.root:
return ok(false)
ok(true)
proc process_state(self: var Checkpoints,
dag: ChainDAGRef,
epochRef: EpochRef,
blck: BlockRef) =
blck: BlockRef): FcResult[void] =
let
state_justified_epoch = epochRef.current_justified_checkpoint.epoch
state_finalized_epoch = epochRef.finalized_checkpoint.epoch
@ -155,70 +248,39 @@ proc process_state(self: var Checkpoints,
trace "Processing epoch",
epoch = epochRef.epoch,
state_justified_epoch = state_justified_epoch,
current_justified = self.current.justified.epochRef.epoch,
current_justified = self.justified.epochRef.epoch,
state_finalized_epoch = state_finalized_epoch,
current_finalized = self.current.finalized
current_finalized = self.finalized.epoch
if (state_justified_epoch > self.current.justified.epochRef.epoch) and
(state_finalized_epoch >= self.current.finalized.epoch):
let justifiedBlck = blck.atEpochStart(state_justified_epoch)
if state_justified_epoch > self.justified.epochRef.epoch:
if state_justified_epoch > self.best_justified.epoch:
self.best_justified = epochRef.current_justified_checkpoint
doAssert justifiedBlck.blck.root == epochRef.current_justified_checkpoint.root
if ? should_update_justified_checkpoint(self, dag, epochRef):
let justifiedBlck = blck.atEpochStart(state_justified_epoch)
let candidate = FFGCheckpoints(
justified: BalanceCheckpoint(
self.justified =
BalanceCheckpoint(
blck: justifiedBlck.blck,
epochRef: dag.getEpochRef(justifiedBlck.blck, state_justified_epoch),
),
finalized: epochRef.finalized_checkpoint,
)
epochRef: dag.getEpochRef(justifiedBlck.blck, state_justified_epoch))
trace "Applying candidate",
justified_block = shortLog(candidate.justified.blck),
justified_epoch = shortLog(candidate.justified.epochRef.epoch),
finalized = candidate.finalized,
state_finalized = state_finalized_epoch
if state_finalized_epoch > self.finalized.epoch:
self.finalized = epochRef.finalized_checkpoint
if self.current.justified.blck.isAncestorOf(justifiedBlck.blck):
trace "Updating current",
prev = shortLog(self.current.justified.blck)
self.current = candidate
else:
trace "No current update",
prev = shortLog(self.current.justified.blck)
if self.justified.epochRef.epoch != state_justified_epoch or
self.justified.blck.root != epochRef.current_justified_checkpoint.root:
if candidate.justified.epochRef.epoch > self.best.justified.epochRef.epoch:
trace "Updating best",
prev = shortLog(self.best.justified.blck)
self.best = candidate
else:
trace "No best update",
prev = shortLog(self.best.justified.blck)
if (state_justified_epoch > self.justified.epochRef.epoch) or
(self.justified.blck.atEpochStart(self.finalized.epoch).blck.root !=
self.finalized.root):
# self.balances_cache.process_state(block_root, state)?;
let justifiedBlck = blck.atEpochStart(state_justified_epoch)
func compute_slots_since_epoch_start(slot: Slot): uint64 =
slot - compute_start_slot_at_epoch(compute_epoch_at_slot(slot))
proc maybe_update(self: var Checkpoints, current_slot: Slot) =
trace "Updating checkpoint",
current_slot,
best = shortLog(self.best.justified.blck),
current = shortLog(self.current.justified.blck),
updateAt = self.updateAt
if self.best.justified.epochRef.epoch > self.current.justified.epochRef.epoch:
let current_epoch = current_slot.compute_epoch_at_slot()
if self.update_at.isNone():
if self.best.justified.epochRef.epoch > self.current.justified.epochRef.epoch:
if compute_slots_since_epoch_start(current_slot) < SAFE_SLOTS_TO_UPDATE_JUSTIFIED:
self.current = self.best
else:
self.update_at = some(current_epoch + 1)
elif self.updateAt.get() <= current_epoch:
self.current = self.best
self.update_at = none(Epoch)
self.justified =
BalanceCheckpoint(
blck: justifiedBlck.blck,
epochRef: dag.getEpochRef(justifiedBlck.blck, state_justified_epoch))
ok()
proc process_block*(self: var ForkChoiceBackend,
block_root: Eth2Digest,
@ -235,9 +297,8 @@ proc process_block*(self: var ForkChoice,
blckRef: BlockRef,
blck: SomeBeaconBlock,
wallSlot: Slot): FcResult[void] =
process_state(self.checkpoints, dag, epochRef, blckRef)
maybe_update(self.checkpoints, wallSlot)
? update_time(self, dag, wallSlot)
? process_state(self.checkpoints, dag, epochRef, blckRef)
for attestation in blck.body.attestations:
let targetBlck = dag.getRef(attestation.data.target.root)
@ -251,14 +312,15 @@ proc process_block*(self: var ForkChoice,
epochRef, attestation.data, attestation.aggregation_bits)
for validator in participants:
self.process_attestation(
self.backend.process_attestation(
validator,
attestation.data.beacon_block_root,
attestation.data.target.epoch)
? process_block(
self.backend, blckRef.root, blck.parent_root,
epochRef.current_justified_checkpoint.epoch, epochRef.finalized_checkpoint.epoch
epochRef.current_justified_checkpoint.epoch,
epochRef.finalized_checkpoint.epoch
)
trace "Integrating block in fork choice",
@ -305,21 +367,17 @@ proc find_head*(
return ok(new_head)
proc find_head*(self: var ForkChoice,
wallSlot: Slot): FcResult[Eth2Digest] =
template remove_alias(blck_root: Eth2Digest): Eth2Digest =
if blck_root == Eth2Digest():
self.finalizedBlock.root
else:
blck_root
self.checkpoints.maybe_update(wallSlot)
# https://github.com/ethereum/eth2.0-specs/blob/v0.12.1/specs/phase0/fork-choice.md#get_head
proc get_head*(self: var ForkChoice,
dag: ChainDAGRef,
wallSlot: Slot): FcResult[Eth2Digest] =
? self.update_time(dag, wallSlot)
self.backend.find_head(
self.checkpoints.current.justified.epochRef.epoch,
remove_alias(self.checkpoints.current.justified.blck.root),
self.checkpoints.current.finalized.epoch,
self.checkpoints.current.justified.epochRef.effective_balances,
self.checkpoints.justified.epochRef.epoch,
self.checkpoints.justified.blck.root,
self.checkpoints.finalized.epoch,
self.checkpoints.justified.epochRef.effective_balances,
)
func maybe_prune*(
@ -329,8 +387,7 @@ func maybe_prune*(
self.proto_array.maybe_prune(finalized_root)
func prune*(self: var ForkChoice): FcResult[void] =
let finalized_root = self.checkpoints.current.finalized.root
self.backend.maybe_prune(finalized_root)
self.backend.maybe_prune(self.checkpoints.finalized.root)
func compute_deltas(
deltas: var openarray[Delta],

View File

@ -34,7 +34,7 @@ type
## Fork Choice Error Kinds
fcFinalizedNodeUnknown
fcJustifiedNodeUnknown
fcInvalidFinalizedRootCHange
fcInvalidFinalizedRootChange
fcInvalidNodeIndex
fcInvalidParentIndex
fcInvalidBestChildIndex
@ -47,10 +47,20 @@ type
fcInvalidDeltaLen
fcRevertedFinalizedEpoch
fcInvalidBestNode
fcInconsistentTick
# -------------------------
# TODO: Extra error modes beyond Proto/Lighthouse to be reviewed
fcUnknownParent
AttErrorKind* = enum
attFromFuture
attFromPast
attBadTargetEpoch
attUnkownTarget
attUnknownBlock
attWrongTarget
attFutureSlot
FcUnderflowKind* = enum
## Fork Choice Overflow Kinds
fcUnderflowIndices = "Indices Overflow"
@ -58,15 +68,16 @@ type
fcUnderflowBestDescendant = "Best Descendant Overflow"
Index* = int
Delta* = int
## Delta indices
Delta* = int64
## Delta balances
ForkChoiceError* = object
case kind*: fcKind
of fcFinalizedNodeUnknown,
fcJustifiedNodeUnknown:
block_root*: Eth2Digest
of fcInvalidFinalizedRootChange:
of fcInvalidFinalizedRootChange,
fcInconsistentTick:
discard
of fcInvalidNodeIndex,
fcInvalidParentIndex,
@ -118,14 +129,11 @@ type
blck*: BlockRef
epochRef*: EpochRef
FFGCheckpoints* = object
Checkpoints* = object
time*: Slot
justified*: BalanceCheckpoint
finalized*: Checkpoint
Checkpoints* = object
current*: FFGCheckpoints
best*: FFGCheckpoints
updateAt*: Option[Epoch]
best_justified*: Checkpoint
# Fork choice high-level types
# ----------------------------------------------------------------------
@ -141,10 +149,17 @@ type
votes*: seq[VoteTracker]
balances*: seq[Gwei]
QueuedAttestation* = object
slot*: Slot
attesting_indices*: seq[ValidatorIndex]
block_root*: Eth2Digest
target_epoch*: Epoch
ForkChoice* = object
backend*: ForkChoiceBackend
checkpoints*: Checkpoints
finalizedBlock*: BlockRef ## Any finalized block used at startup
queuedAttestations*: seq[QueuedAttestation]
func shortlog*(vote: VoteTracker): auto =
(

View File

@ -237,7 +237,7 @@ suiteReport "Attestation pool processing" & preset():
pool[].addForkChoice(epochRef, blckRef, signedBlock.message, blckRef.slot)
bc1 = get_beacon_committee(
state.data.data, state.data.data.slot, 1.CommitteeIndex, cache)
state.data.data, state.data.data.slot - 1, 1.CommitteeIndex, cache)
attestation0 = makeAttestation(state.data.data, b10.root, bc1[0], cache)
pool[].addAttestation(attestation0, attestation0.data.slot)