# beacon_chain # Copyright (c) 2018-2020 Status Research & Development GmbH # Licensed and distributed under either of # * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). # * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). # at your option. This file may not be copied, modified, or distributed except according to those terms. import # Standard library std/[os, tables, sequtils, osproc, streams], # Nimble packages stew/[objects], stew/shims/macros, chronos, metrics, json_rpc/[rpcserver, jsonmarshal], chronicles, json_serialization/std/[options, sets, net], serialization/errors, eth/db/kvstore, eth/[keys, async_utils], eth/p2p/discoveryv5/[protocol, enr], # Local modules spec/[datatypes, digest, crypto, helpers, validator, network, signatures], spec/state_transition, conf, time, validator_pool, attestation_pool, block_pools/[spec_cache, chain_dag, clearance], eth2_network, keystore_management, beacon_node_common, beacon_node_types, nimbus_binary_common, mainchain_monitor, version, ssz/merkleization, interop, attestation_aggregation, sync_manager, sszdump, validator_slashing_protection # Metrics for tracking attestation and beacon block loss declareCounter beacon_attestations_sent, "Number of beacon chain attestations sent by this peer" declareCounter beacon_blocks_proposed, "Number of beacon chain blocks sent by this peer" logScope: topics = "beacval" # TODO: This procedure follows insecure scheme of creating directory without # any permissions and writing file without any permissions. proc saveValidatorKey*(keyName, key: string, conf: BeaconNodeConf) = let validatorsDir = conf.validatorsDir let outputFile = validatorsDir / keyName createDir validatorsDir writeFile(outputFile, key) notice "Imported validator key", file = outputFile proc checkValidatorInRegistry(state: BeaconState, pubKey: ValidatorPubKey) = let idx = state.validators.asSeq.findIt(it.pubKey == pubKey) if idx == -1: # We allow adding a validator even if its key is not in the state registry: # it might be that the deposit for this validator has not yet been processed warn "Validator not in registry (yet?)", pubKey proc addLocalValidator*(node: BeaconNode, state: BeaconState, privKey: ValidatorPrivKey) = let pubKey = privKey.toPubKey() state.checkValidatorInRegistry(pubKey) node.attachedValidators.addLocalValidator(pubKey, privKey) proc addLocalValidators*(node: BeaconNode) = for validatorKey in node.config.validatorKeys: node.addLocalValidator node.chainDag.headState.data.data, validatorKey notice "Local validators attached ", count = node.attachedValidators.count proc addRemoteValidators*(node: BeaconNode) = # load all the validators from the child process - loop until `end` var line = newStringOfCap(120).TaintedString while line != "end" and running(node.vcProcess): if node.vcProcess.outputStream.readLine(line) and line != "end": let key = ValidatorPubKey.fromHex(line).get().initPubKey() node.chainDag.headState.data.data.checkValidatorInRegistry(key) let v = AttachedValidator(pubKey: key, kind: ValidatorKind.remote, connection: ValidatorConnection( inStream: node.vcProcess.inputStream, outStream: node.vcProcess.outputStream, pubKeyStr: $key)) node.attachedValidators.addRemoteValidator(key, v) notice "Remote validators attached ", count = node.attachedValidators.count proc getAttachedValidator*(node: BeaconNode, pubkey: ValidatorPubKey): AttachedValidator = node.attachedValidators.getValidator(pubkey) proc getAttachedValidator*(node: BeaconNode, state: BeaconState, idx: ValidatorIndex): AttachedValidator = if idx < state.validators.len.ValidatorIndex: node.getAttachedValidator(state.validators[idx].pubkey) else: warn "Validator index out of bounds", idx, stateSlot = state.slot, validators = state.validators.len nil proc getAttachedValidator*(node: BeaconNode, epochRef: EpochRef, idx: ValidatorIndex): AttachedValidator = if idx < epochRef.validator_keys.len.ValidatorIndex: node.getAttachedValidator(epochRef.validator_keys[idx]) else: warn "Validator index out of bounds", idx, epoch = epochRef.epoch, validators = epochRef.validator_keys.len nil proc isSynced*(node: BeaconNode, head: BlockRef): bool = ## TODO This function is here as a placeholder for some better heurestics to ## determine if we're in sync and should be producing blocks and ## attestations. Generally, the problem is that slot time keeps advancing ## even when there are no blocks being produced, so there's no way to ## distinguish validators geniunely going missing from the node not being ## well connected (during a network split or an internet outage for ## example). It would generally be correct to simply keep running as if ## we were the only legit node left alive, but then we run into issues: ## with enough many empty slots, the validator pool is emptied leading ## to empty committees and lots of empty slot processing that will be ## thrown away as soon as we're synced again. let # The slot we should be at, according to the clock beaconTime = node.beaconClock.now() wallSlot = beaconTime.toSlot() # TODO: MaxEmptySlotCount should likely involve the weak subjectivity period. # TODO if everyone follows this logic, the network will not recover from a # halt: nobody will be producing blocks because everone expects someone # else to do it if wallSlot.afterGenesis and head.slot + MaxEmptySlotCount < wallSlot.slot: false else: true proc sendAttestation*( node: BeaconNode, attestation: Attestation, num_active_validators: uint64) = node.network.broadcast( getAttestationTopic(node.forkDigest, attestation, num_active_validators), attestation) beacon_attestations_sent.inc() proc sendAttestation*(node: BeaconNode, attestation: Attestation) = # For the validator API, which doesn't supply num_active_validators. let attestationBlck = node.chainDag.getRef(attestation.data.beacon_block_root) if attestationBlck.isNil: debug "Attempt to send attestation without corresponding block" return node.sendAttestation( attestation, count_active_validators( node.chainDag.getEpochRef(attestationBlck, attestation.data.target.epoch))) proc createAndSendAttestation(node: BeaconNode, fork: Fork, genesis_validators_root: Eth2Digest, validator: AttachedValidator, attestationData: AttestationData, committeeLen: int, indexInCommittee: int, num_active_validators: uint64) {.async.} = var attestation = await validator.produceAndSignAttestation( attestationData, committeeLen, indexInCommittee, fork, genesis_validators_root) node.sendAttestation(attestation, num_active_validators) if node.config.dumpEnabled: dump(node.config.dumpDirOutgoing, attestation.data, validator.pubKey) notice "Attestation sent", attestation = shortLog(attestation), validator = shortLog(validator), indexInCommittee = indexInCommittee type ValidatorInfoForMakeBeaconBlockKind* = enum viValidator viRandao_reveal ValidatorInfoForMakeBeaconBlock* = object case kind*: ValidatorInfoForMakeBeaconBlockKind of viValidator: validator*: AttachedValidator of viRandao_reveal: randao_reveal*: ValidatorSig proc makeBeaconBlockForHeadAndSlot*(node: BeaconNode, val_info: ValidatorInfoForMakeBeaconBlock, validator_index: ValidatorIndex, graffiti: GraffitiBytes, head: BlockRef, slot: Slot): Future[tuple[message: Option[BeaconBlock], fork: Fork, genesis_validators_root: Eth2Digest]] {.async.} = # Advance state to the slot that we're proposing for - this is the equivalent # of running `process_slots` up to the slot of the new block. node.chainDag.withState( node.chainDag.tmpState, head.atSlot(slot)): let (eth1data, deposits) = if node.mainchainMonitor.isNil: (get_eth1data_stub(state.eth1_deposit_index, slot.compute_epoch_at_slot()), newSeq[Deposit]()) else: node.mainchainMonitor.getBlockProposalData(state) # TODO perhaps just making the enclosing function accept 2 different types at the # same time and doing some compile-time branching logic is cleaner (without the # need for the discriminated union)... but we need the `state` from `withState` # in order to get the fork/root for the specific head/slot for the randao_reveal # and it's causing problems when the function becomes a generic for 2 types... proc getRandaoReveal(val_info: ValidatorInfoForMakeBeaconBlock): Future[ValidatorSig] {.async.} = if val_info.kind == viValidator: return await val_info.validator.genRandaoReveal( state.fork, state.genesis_validators_root, slot) elif val_info.kind == viRandao_reveal: return val_info.randao_reveal let poolPtr = unsafeAddr node.chainDag # safe because restore is short-lived func restore(v: var HashedBeaconState) = # TODO address this ugly workaround - there should probably be a # `state_transition` that takes a `StateData` instead and updates # the block as well doAssert v.addr == addr poolPtr.tmpState.data assign(poolPtr.tmpState, poolPtr.headState) let message = makeBeaconBlock( node.config.runtimePreset, hashedState, validator_index, head.root, await getRandaoReveal(val_info), eth1data, graffiti, node.attestationPool[].getAttestationsForBlock(state, cache), deposits, restore, cache) if message.isSome(): # TODO this restore is needed because otherwise tmpState will be internally # inconsistent - it's blck will not be pointing to the block that # created this state - we have to reset it here before `await` to avoid # races. restore(poolPtr.tmpState.data) return (message, state.fork, state.genesis_validators_root) proc proposeSignedBlock*(node: BeaconNode, head: BlockRef, validator: AttachedValidator, newBlock: SignedBeaconBlock): Future[BlockRef] {.async.} = let newBlockRef = node.chainDag.addRawBlock(node.quarantine, newBlock) do ( blckRef: BlockRef, signedBlock: SignedBeaconBlock, epochRef: EpochRef, state: HashedBeaconState): # Callback add to fork choice if valid node.attestationPool[].addForkChoice( epochRef, blckRef, signedBlock.message, node.beaconClock.now().slotOrZero()) if newBlockRef.isErr: warn "Unable to add proposed block to block pool", newBlock = shortLog(newBlock.message), blockRoot = shortLog(newBlock.root) return head notice "Block proposed", blck = shortLog(newBlock.message), blockRoot = shortLog(newBlockRef[].root), validator = shortLog(validator) if node.config.dumpEnabled: dump(node.config.dumpDirOutgoing, newBlock) node.network.broadcast(node.topicBeaconBlocks, newBlock) beacon_blocks_proposed.inc() return newBlockRef[] proc proposeBlock(node: BeaconNode, validator: AttachedValidator, validator_index: ValidatorIndex, head: BlockRef, slot: Slot): Future[BlockRef] {.async.} = if head.slot >= slot: # We should normally not have a head newer than the slot we're proposing for # but this can happen if block proposal is delayed warn "Skipping proposal, have newer head already", headSlot = shortLog(head.slot), headBlockRoot = shortLog(head.root), slot = shortLog(slot) return head let notSlashable = node.attachedValidators .slashingProtection .checkSlashableBlockProposal(validator.pubkey, slot) if notSlashable.isErr: warn "Slashing protection activated", validator = validator.pubkey, slot = slot, existingProposal = notSlashable.error return head let valInfo = ValidatorInfoForMakeBeaconBlock(kind: viValidator, validator: validator) let beaconBlockTuple = await makeBeaconBlockForHeadAndSlot( node, valInfo, validator_index, node.graffitiBytes, head, slot) if not beaconBlockTuple.message.isSome(): return head # already logged elsewhere! var newBlock = SignedBeaconBlock( message: beaconBlockTuple.message.get() ) newBlock.root = hash_tree_root(newBlock.message) # TODO: recomputed in block proposal let signing_root = compute_block_root( beaconBlockTuple.fork, beaconBlockTuple.genesis_validators_root, slot, newBlock.root) node.attachedValidators .slashingProtection .registerBlock(validator.pubkey, slot, signing_root) newBlock.signature = await validator.signBlockProposal( beaconBlockTuple.fork, beaconBlockTuple.genesis_validators_root, slot, newBlock.root) return await node.proposeSignedBlock(head, validator, newBlock) proc handleAttestations(node: BeaconNode, head: BlockRef, slot: Slot) = ## Perform all attestations that the validators attached to this node should ## perform during the given slot if slot + SLOTS_PER_EPOCH < head.slot: # The latest block we know about is a lot newer than the slot we're being # asked to attest to - this makes it unlikely that it will be included # at all. # TODO the oldest attestations allowed are those that are older than the # finalized epoch.. also, it seems that posting very old attestations # is risky from a slashing perspective. More work is needed here. warn "Skipping attestation, head is too recent", headSlot = shortLog(head.slot), slot = shortLog(slot) return let attestationHead = head.atSlot(slot) if head != attestationHead.blck: # In rare cases, such as when we're busy syncing or just slow, we'll be # attesting to a past state - we must then recreate the world as it looked # like back then notice "Attesting to a state in the past, falling behind?", headSlot = shortLog(head.slot), attestationHeadSlot = shortLog(attestationHead.slot), attestationSlot = shortLog(slot) trace "Checking attestations", attestationHeadRoot = shortLog(attestationHead.blck.root), attestationSlot = shortLog(slot) # Collect data to send before node.stateCache grows stale var attestations: seq[tuple[ data: AttestationData, committeeLen, indexInCommittee: int, validator: AttachedValidator]] # We need to run attestations exactly for the slot that we're attesting to. # In case blocks went missing, this means advancing past the latest block # using empty slots as fillers. # https://github.com/ethereum/eth2.0-specs/blob/v0.12.3/specs/phase0/validator.md#validator-assignments let epochRef = node.chainDag.getEpochRef( attestationHead.blck, slot.compute_epoch_at_slot()) committees_per_slot = get_committee_count_per_slot(epochRef) num_active_validators = count_active_validators(epochRef) fork = node.chainDag.headState.data.data.fork genesis_validators_root = node.chainDag.headState.data.data.genesis_validators_root for committee_index in 0'u64.. 2: sleepToSlotOffsetWithHeadUpdate( seconds(int64(SECONDS_PER_SLOT * 2) div 3), "Waiting to aggregate attestations") const TRAILING_DISTANCE = 1 # https://github.com/ethereum/eth2.0-specs/blob/v0.12.3/specs/phase0/p2p-interface.md#configuration static: doAssert TRAILING_DISTANCE <= ATTESTATION_PROPAGATION_SLOT_RANGE let aggregationSlot = slot - TRAILING_DISTANCE aggregationHead = get_ancestor(head, aggregationSlot) await broadcastAggregatedAttestations(node, aggregationHead, aggregationSlot)