From 79ff4f7c41eeec0d94fc9b357bb128ee9e3eaf1b Mon Sep 17 00:00:00 2001 From: Jacek Sieka Date: Mon, 17 Aug 2020 20:36:13 +0200 Subject: [PATCH] fork choice refresh (#1520) * add attestation processing queue so attestations don't get processed too early * rework justified slot delay to match spec / lighthouse better * keep less state in fork choice * request epochref less --- beacon_chain/attestation_pool.nim | 20 +- beacon_chain/fork_choice/fork_choice.nim | 237 +++++++++++------- .../fork_choice/fork_choice_types.nim | 35 ++- tests/test_attestation_pool.nim | 2 +- 4 files changed, 185 insertions(+), 109 deletions(-) diff --git a/beacon_chain/attestation_pool.nim b/beacon_chain/attestation_pool.nim index 3f7284adc..4490ddae2 100644 --- a/beacon_chain/attestation_pool.nim +++ b/beacon_chain/attestation_pool.nim @@ -68,12 +68,14 @@ proc init*(T: type AttestationPool, chainDag: ChainDAGRef, quarantine: Quarantin forkChoice: forkChoice ) -func processAttestation( - pool: var AttestationPool, participants: HashSet[ValidatorIndex], - block_root: Eth2Digest, target_epoch: Epoch) = +proc processAttestation( + pool: var AttestationPool, slot: Slot, participants: HashSet[ValidatorIndex], + block_root: Eth2Digest, target: Checkpoint, wallSlot: Slot) = # Add attestation votes to fork choice - for validator in participants: - pool.forkChoice.process_attestation(validator, block_root, target_epoch) + if (let v = pool.forkChoice.on_attestation( + pool.chainDag, slot, block_root, toSeq(participants), target, wallSlot); + v.isErr): + warn "Couldn't process attestation", err = v.error() func addUnresolved*(pool: var AttestationPool, attestation: Attestation) = pool.unresolved[attestation.data.beacon_block_root] = @@ -160,7 +162,8 @@ proc addResolved( a.validations.add(validation) pool.processAttestation( - participants, a.blck.root, attestation.data.target.epoch) + attestation.data.slot, participants, attestation.data.beacon_block_root, + attestation.data.target, wallSlot) info "Attestation resolved", attestation = shortLog(attestation), @@ -178,7 +181,8 @@ proc addResolved( validations: @[validation] )) pool.processAttestation( - participants, blck.root, attestation.data.target.epoch) + attestation.data.slot, participants, attestation.data.beacon_block_root, + attestation.data.target, wallSlot) info "Attestation resolved", attestation = shortLog(attestation), @@ -346,7 +350,7 @@ proc resolve*(pool: var AttestationPool, wallSlot: Slot) = pool.addResolved(a.blck, a.attestation, wallSlot) proc selectHead*(pool: var AttestationPool, wallSlot: Slot): BlockRef = - let newHead = pool.forkChoice.find_head(wallSlot) + let newHead = pool.forkChoice.get_head(pool.chainDag, wallSlot) if newHead.isErr: error "Couldn't select head", err = newHead.error diff --git a/beacon_chain/fork_choice/fork_choice.nim b/beacon_chain/fork_choice/fork_choice.nim index 152170f7c..4d7b71375 100644 --- a/beacon_chain/fork_choice/fork_choice.nim +++ b/beacon_chain/fork_choice/fork_choice.nim @@ -80,11 +80,11 @@ proc initForkChoice*(finalizedState: StateData, let finalized_epoch = finalizedState.data.data.get_current_epoch() - let ffgCheckpoint = FFGCheckpoints( - justified: BalanceCheckpoint( - blck: finalizedState.blck, - epochRef: epochRef), - finalized: Checkpoint(root: finalizedState.blck.root, epoch: finalized_epoch)) + let + justified = BalanceCheckpoint( + blck: finalizedState.blck, epochRef: epochRef) + finalized = Checkpoint( + root: finalizedState.blck.root, epoch: finalized_epoch) let backend = ? initForkChoiceBackend( finalized_epoch, finalized_epoch, finalizedState.blck.root) @@ -92,9 +92,10 @@ proc initForkChoice*(finalizedState: StateData, ok(ForkChoice( backend: backend, checkpoints: Checkpoints( - current: ffgCheckpoint, - best: ffgCheckpoint), - finalizedBlock: finalizedState.blck, + justified: justified, + best_justified: + Checkpoint(root: justified.blck.root, epoch: justified.epochRef.epoch), + finalized: finalized) )) func extend[T](s: var seq[T], minLen: int) = @@ -104,6 +105,38 @@ func extend[T](s: var seq[T], minLen: int) = if s.len < minLen: s.setLen(minLen) +proc compute_slots_since_epoch_start(slot: Slot): uint64 = + slot - slot.epoch().compute_start_slot_at_epoch() + + +proc on_tick(self: var Checkpoints, dag: ChainDAGRef, time: Slot): FcResult[void] = + if self.time > time: + return err(ForkChoiceError(kind: fcInconsistentTick)) + + let newEpoch = self.time.epoch() != time.epoch() + self.time = time + + if newEpoch and + self.best_justified.epoch > self.justified.epochRef.epoch: + let blck = dag.getRef(self.best_justified.root) + if blck.isNil: + return err(ForkChoiceError( + kind: fcJustifiedNodeUnknown, block_root: self.best_justified.root)) + + self.justified = BalanceCheckpoint( + blck: blck, + epochRef: dag.getEpochRef(blck, self.best_justified.epoch)) + ok() + +proc process_attestation_queue(self: var ForkChoice) {.gcsafe.} +proc update_time(self: var ForkChoice, dag: ChainDAGRef, time: Slot): FcResult[void] = + while time > self.checkpoints.time: + ? on_tick(self.checkpoints, dag, self.checkpoints.time + 1) + + self.process_attestation_queue() + + ok() + func process_attestation*( self: var ForkChoiceBackend, validator_index: ValidatorIndex, @@ -129,13 +162,19 @@ func process_attestation*( validator_index = validator_index, new_vote = shortLog(vote) -func process_attestation*( - self: var ForkChoice, - validator_index: ValidatorIndex, - block_root: Eth2Digest, - target_epoch: Epoch - ) = - self.backend.process_attestation(validator_index, block_root, target_epoch) +proc process_attestation_queue(self: var ForkChoice) = + var + keep: seq[QueuedAttestation] + for attestation in self.queuedAttestations: + if attestation.slot < self.checkpoints.time: + for validator_index in attestation.attesting_indices: + self.backend.process_attestation( + validator_index, attestation.block_root, + attestation.target_epoch) + else: + keep.add attestation + + self.queuedAttestations = keep func contains*(self: ForkChoiceBackend, block_root: Eth2Digest): bool = ## Returns `true` if a block is known to the fork choice @@ -144,10 +183,64 @@ func contains*(self: ForkChoiceBackend, block_root: Eth2Digest): bool = ## In particular, before adding a block, its parent must be known to the fork choice self.proto_array.indices.contains(block_root) +# https://github.com/ethereum/eth2.0-specs/blob/v0.12.1/specs/phase0/fork-choice.md#on_attestation +proc on_attestation*( + self: var ForkChoice, + dag: ChainDAGRef, + slot: Slot, + beacon_block_root: Eth2Digest, + attesting_indices: openArray[ValidatorIndex], + target: Checkpoint, + wallSlot: Slot + ): FcResult[void] = + ? self.update_time(dag, wallSlot) + + if beacon_block_root == Eth2Digest(): + return ok() + + if slot < self.checkpoints.time: + for validator_index in attesting_indices: + self.backend.process_attestation( + validator_index, beacon_block_root, target.epoch) + else: + self.queued_attestations.add(QueuedAttestation( + slot: slot, + attesting_indices: @attesting_indices, + block_root: beacon_block_root, + target_epoch: target.epoch)) + ok() + +# https://github.com/ethereum/eth2.0-specs/blob/v0.12.1/specs/phase0/fork-choice.md#should_update_justified_checkpoint +proc should_update_justified_checkpoint( + self: var Checkpoints, + dag: ChainDAGRef, + epochRef: EpochRef): FcResult[bool] = + if compute_slots_since_epoch_start(self.time) < SAFE_SLOTS_TO_UPDATE_JUSTIFIED: + return ok(true) + + let + justified_slot = compute_start_slot_at_epoch(self.justified.epochRef.epoch) + + let new_justified_checkpoint = epochRef.current_justified_checkpoint; + + let justified_blck = dag.getRef(new_justified_checkpoint.root) + + if justified_blck.isNil: + return err(ForkChoiceError( + kind: fcJustifiedNodeUnknown, block_root: new_justified_checkpoint.root)) + + let justified_ancestor = + justified_blck.atSlot(justified_slot) + + if justified_ancestor.blck.root != self.justified.blck.root: + return ok(false) + + ok(true) + proc process_state(self: var Checkpoints, dag: ChainDAGRef, epochRef: EpochRef, - blck: BlockRef) = + blck: BlockRef): FcResult[void] = let state_justified_epoch = epochRef.current_justified_checkpoint.epoch state_finalized_epoch = epochRef.finalized_checkpoint.epoch @@ -155,70 +248,39 @@ proc process_state(self: var Checkpoints, trace "Processing epoch", epoch = epochRef.epoch, state_justified_epoch = state_justified_epoch, - current_justified = self.current.justified.epochRef.epoch, + current_justified = self.justified.epochRef.epoch, state_finalized_epoch = state_finalized_epoch, - current_finalized = self.current.finalized + current_finalized = self.finalized.epoch - if (state_justified_epoch > self.current.justified.epochRef.epoch) and - (state_finalized_epoch >= self.current.finalized.epoch): - let justifiedBlck = blck.atEpochStart(state_justified_epoch) + if state_justified_epoch > self.justified.epochRef.epoch: + if state_justified_epoch > self.best_justified.epoch: + self.best_justified = epochRef.current_justified_checkpoint - doAssert justifiedBlck.blck.root == epochRef.current_justified_checkpoint.root + if ? should_update_justified_checkpoint(self, dag, epochRef): + let justifiedBlck = blck.atEpochStart(state_justified_epoch) - let candidate = FFGCheckpoints( - justified: BalanceCheckpoint( + self.justified = + BalanceCheckpoint( blck: justifiedBlck.blck, - epochRef: dag.getEpochRef(justifiedBlck.blck, state_justified_epoch), - ), - finalized: epochRef.finalized_checkpoint, - ) + epochRef: dag.getEpochRef(justifiedBlck.blck, state_justified_epoch)) - trace "Applying candidate", - justified_block = shortLog(candidate.justified.blck), - justified_epoch = shortLog(candidate.justified.epochRef.epoch), - finalized = candidate.finalized, - state_finalized = state_finalized_epoch + if state_finalized_epoch > self.finalized.epoch: + self.finalized = epochRef.finalized_checkpoint - if self.current.justified.blck.isAncestorOf(justifiedBlck.blck): - trace "Updating current", - prev = shortLog(self.current.justified.blck) - self.current = candidate - else: - trace "No current update", - prev = shortLog(self.current.justified.blck) + if self.justified.epochRef.epoch != state_justified_epoch or + self.justified.blck.root != epochRef.current_justified_checkpoint.root: - if candidate.justified.epochRef.epoch > self.best.justified.epochRef.epoch: - trace "Updating best", - prev = shortLog(self.best.justified.blck) - self.best = candidate - else: - trace "No best update", - prev = shortLog(self.best.justified.blck) + if (state_justified_epoch > self.justified.epochRef.epoch) or + (self.justified.blck.atEpochStart(self.finalized.epoch).blck.root != + self.finalized.root): - # self.balances_cache.process_state(block_root, state)?; + let justifiedBlck = blck.atEpochStart(state_justified_epoch) -func compute_slots_since_epoch_start(slot: Slot): uint64 = - slot - compute_start_slot_at_epoch(compute_epoch_at_slot(slot)) - -proc maybe_update(self: var Checkpoints, current_slot: Slot) = - trace "Updating checkpoint", - current_slot, - best = shortLog(self.best.justified.blck), - current = shortLog(self.current.justified.blck), - updateAt = self.updateAt - - if self.best.justified.epochRef.epoch > self.current.justified.epochRef.epoch: - let current_epoch = current_slot.compute_epoch_at_slot() - - if self.update_at.isNone(): - if self.best.justified.epochRef.epoch > self.current.justified.epochRef.epoch: - if compute_slots_since_epoch_start(current_slot) < SAFE_SLOTS_TO_UPDATE_JUSTIFIED: - self.current = self.best - else: - self.update_at = some(current_epoch + 1) - elif self.updateAt.get() <= current_epoch: - self.current = self.best - self.update_at = none(Epoch) + self.justified = + BalanceCheckpoint( + blck: justifiedBlck.blck, + epochRef: dag.getEpochRef(justifiedBlck.blck, state_justified_epoch)) + ok() proc process_block*(self: var ForkChoiceBackend, block_root: Eth2Digest, @@ -235,9 +297,8 @@ proc process_block*(self: var ForkChoice, blckRef: BlockRef, blck: SomeBeaconBlock, wallSlot: Slot): FcResult[void] = - process_state(self.checkpoints, dag, epochRef, blckRef) - - maybe_update(self.checkpoints, wallSlot) + ? update_time(self, dag, wallSlot) + ? process_state(self.checkpoints, dag, epochRef, blckRef) for attestation in blck.body.attestations: let targetBlck = dag.getRef(attestation.data.target.root) @@ -251,14 +312,15 @@ proc process_block*(self: var ForkChoice, epochRef, attestation.data, attestation.aggregation_bits) for validator in participants: - self.process_attestation( + self.backend.process_attestation( validator, attestation.data.beacon_block_root, attestation.data.target.epoch) ? process_block( self.backend, blckRef.root, blck.parent_root, - epochRef.current_justified_checkpoint.epoch, epochRef.finalized_checkpoint.epoch + epochRef.current_justified_checkpoint.epoch, + epochRef.finalized_checkpoint.epoch ) trace "Integrating block in fork choice", @@ -305,21 +367,17 @@ proc find_head*( return ok(new_head) -proc find_head*(self: var ForkChoice, - wallSlot: Slot): FcResult[Eth2Digest] = - template remove_alias(blck_root: Eth2Digest): Eth2Digest = - if blck_root == Eth2Digest(): - self.finalizedBlock.root - else: - blck_root - - self.checkpoints.maybe_update(wallSlot) +# https://github.com/ethereum/eth2.0-specs/blob/v0.12.1/specs/phase0/fork-choice.md#get_head +proc get_head*(self: var ForkChoice, + dag: ChainDAGRef, + wallSlot: Slot): FcResult[Eth2Digest] = + ? self.update_time(dag, wallSlot) self.backend.find_head( - self.checkpoints.current.justified.epochRef.epoch, - remove_alias(self.checkpoints.current.justified.blck.root), - self.checkpoints.current.finalized.epoch, - self.checkpoints.current.justified.epochRef.effective_balances, + self.checkpoints.justified.epochRef.epoch, + self.checkpoints.justified.blck.root, + self.checkpoints.finalized.epoch, + self.checkpoints.justified.epochRef.effective_balances, ) func maybe_prune*( @@ -329,8 +387,7 @@ func maybe_prune*( self.proto_array.maybe_prune(finalized_root) func prune*(self: var ForkChoice): FcResult[void] = - let finalized_root = self.checkpoints.current.finalized.root - self.backend.maybe_prune(finalized_root) + self.backend.maybe_prune(self.checkpoints.finalized.root) func compute_deltas( deltas: var openarray[Delta], diff --git a/beacon_chain/fork_choice/fork_choice_types.nim b/beacon_chain/fork_choice/fork_choice_types.nim index 13ae6dea5..4592c8e5e 100644 --- a/beacon_chain/fork_choice/fork_choice_types.nim +++ b/beacon_chain/fork_choice/fork_choice_types.nim @@ -34,7 +34,7 @@ type ## Fork Choice Error Kinds fcFinalizedNodeUnknown fcJustifiedNodeUnknown - fcInvalidFinalizedRootCHange + fcInvalidFinalizedRootChange fcInvalidNodeIndex fcInvalidParentIndex fcInvalidBestChildIndex @@ -47,10 +47,20 @@ type fcInvalidDeltaLen fcRevertedFinalizedEpoch fcInvalidBestNode + fcInconsistentTick # ------------------------- # TODO: Extra error modes beyond Proto/Lighthouse to be reviewed fcUnknownParent + AttErrorKind* = enum + attFromFuture + attFromPast + attBadTargetEpoch + attUnkownTarget + attUnknownBlock + attWrongTarget + attFutureSlot + FcUnderflowKind* = enum ## Fork Choice Overflow Kinds fcUnderflowIndices = "Indices Overflow" @@ -58,15 +68,16 @@ type fcUnderflowBestDescendant = "Best Descendant Overflow" Index* = int - Delta* = int - ## Delta indices + Delta* = int64 + ## Delta balances ForkChoiceError* = object case kind*: fcKind of fcFinalizedNodeUnknown, fcJustifiedNodeUnknown: block_root*: Eth2Digest - of fcInvalidFinalizedRootChange: + of fcInvalidFinalizedRootChange, + fcInconsistentTick: discard of fcInvalidNodeIndex, fcInvalidParentIndex, @@ -118,14 +129,11 @@ type blck*: BlockRef epochRef*: EpochRef - FFGCheckpoints* = object + Checkpoints* = object + time*: Slot justified*: BalanceCheckpoint finalized*: Checkpoint - - Checkpoints* = object - current*: FFGCheckpoints - best*: FFGCheckpoints - updateAt*: Option[Epoch] + best_justified*: Checkpoint # Fork choice high-level types # ---------------------------------------------------------------------- @@ -141,10 +149,17 @@ type votes*: seq[VoteTracker] balances*: seq[Gwei] + QueuedAttestation* = object + slot*: Slot + attesting_indices*: seq[ValidatorIndex] + block_root*: Eth2Digest + target_epoch*: Epoch + ForkChoice* = object backend*: ForkChoiceBackend checkpoints*: Checkpoints finalizedBlock*: BlockRef ## Any finalized block used at startup + queuedAttestations*: seq[QueuedAttestation] func shortlog*(vote: VoteTracker): auto = ( diff --git a/tests/test_attestation_pool.nim b/tests/test_attestation_pool.nim index 64e231d27..7c7671905 100644 --- a/tests/test_attestation_pool.nim +++ b/tests/test_attestation_pool.nim @@ -237,7 +237,7 @@ suiteReport "Attestation pool processing" & preset(): pool[].addForkChoice(epochRef, blckRef, signedBlock.message, blckRef.slot) bc1 = get_beacon_committee( - state.data.data, state.data.data.slot, 1.CommitteeIndex, cache) + state.data.data, state.data.data.slot - 1, 1.CommitteeIndex, cache) attestation0 = makeAttestation(state.data.data, b10.root, bc1[0], cache) pool[].addAttestation(attestation0, attestation0.data.slot)