From e22248bca1fd0566b3c55a705ef98bc1fd4b14fc Mon Sep 17 00:00:00 2001 From: Zahary Karadjov Date: Thu, 19 Nov 2020 19:19:03 +0200 Subject: [PATCH] Tentative fix for the Pyrmont Eth1 voting issue --- beacon_chain/eth1_monitor.nim | 135 +++++++++++++++++++----------- beacon_chain/rpc/debug_api.nim | 18 +++- beacon_chain/spec/datatypes.nim | 2 + beacon_chain/validator_duties.nim | 20 +++-- 4 files changed, 117 insertions(+), 58 deletions(-) diff --git a/beacon_chain/eth1_monitor.nim b/beacon_chain/eth1_monitor.nim index 49a8cde68..6a5882491 100644 --- a/beacon_chain/eth1_monitor.nim +++ b/beacon_chain/eth1_monitor.nim @@ -8,6 +8,9 @@ import export web3Types +logScope: + topics = "eth1" + contract(DepositContract): proc deposit(pubkey: Bytes48, withdrawalCredentials: Bytes32, @@ -70,7 +73,6 @@ type Web3DataProviderRef* = ref Web3DataProvider - ReorgDepthLimitExceeded = object of CatchableError CorruptDataProvider = object of CatchableError DisconnectHandler* = proc () {.gcsafe, raises: [Defect].} @@ -81,9 +83,13 @@ type amount: Bytes8, signature: Bytes96, merkleTreeIndex: Bytes8, j: JsonNode) {.raises: [Defect], gcsafe.} + BlockProposalEth1Data* = object + vote*: Eth1Data + deposits*: seq[Deposit] + const web3Timeouts = 5.seconds - hasDepositRootChecks = defined(with_deposit_root_checks) + hasDepositRootChecks = true # defined(with_deposit_root_checks) template depositContractAddress(m: Eth1Monitor): Eth1Address = m.dataProvider.ns.contractAddress @@ -91,6 +97,9 @@ template depositContractAddress(m: Eth1Monitor): Eth1Address = template web3Url(m: Eth1Monitor): string = m.dataProvider.url +template blocks*(m: Eth1Monitor): Deque[Eth1Block] = + m.eth1Chain.blocks + proc fixupWeb3Urls*(web3Url: var string) = ## Converts HTTP and HTTPS Infura URLs to their WebSocket equivalents ## because we are missing a functional HTTPS client. @@ -164,6 +173,17 @@ template findBlock*(eth1Chain: Eth1Chain, hash: BlockHash): Eth1Block = template findBlock*(eth1Chain: Eth1Chain, eth1Data: Eth1Data): Eth1Block = getOrDefault(eth1Chain.blocksByHash, asBlockHash(eth1Data.block_hash), nil) +func makeSuccessorWithoutDeposits(existingBlock: Eth1Block, + successor: BlockObject): ETh1Block = + Eth1Block( + number: Eth1BlockNumber successor.number, + timestamp: Eth1BlockTimestamp successor.timestamp, + voteData: Eth1Data( + block_hash: successor.hash.asEth2Digest, + deposit_count: existingBlock.voteData.deposit_count, + deposit_root: existingBlock.voteData.deposit_root), + activeValidatorsCount: existingBlock.activeValidatorsCount) + func latestCandidateBlock(eth1Chain: Eth1Chain, preset: RuntimePreset, periodStart: uint64): Eth1Block = @@ -180,8 +200,11 @@ func addBlock(eth1Chain: var Eth1Chain, newBlock: Eth1Block) = eth1Chain.blocks.addLast newBlock eth1Chain.blocksByHash[newBlock.voteData.block_hash.asBlockHash] = newBlock +template hash*(x: Eth1Data): Hash = + hash(x.block_hash.data) + template hash*(x: Eth1Block): Hash = - hash(x.voteData.block_hash.data) + hash(x.voteData) proc close*(p: Web3DataProviderRef): Future[void] {.async.} = if p.blockHeadersSubscription != nil: @@ -197,17 +220,6 @@ proc getBlockByNumber*(p: Web3DataProviderRef, number: Eth1BlockNumber): Future[BlockObject] = return p.web3.provider.eth_getBlockByNumber(&"0x{number:X}", false) -proc getBlockNumber(p: Web3DataProviderRef, hash: BlockHash): - Future[Eth1BlockNumber] {.async.} = - try: - let blk = awaitWithTimeout(p.getBlockByHash(hash), web3Timeouts): - return 0 - return Eth1BlockNumber(blk.number) - except CatchableError as exc: - debug "Failed to get Eth1 block number from hash", - hash = $hash, err = exc.msg - raise exc - template readJsonField(j: JsonNode, fieldName: string, ValueType: type): untyped = var res: ValueType fromJson(j[fieldName], fieldName, res) @@ -318,7 +330,7 @@ func getDepositsRoot(m: DepositsMerkleizer): Eth2Digest = # https://github.com/ethereum/eth2.0-specs/blob/v1.0.0/specs/phase0/validator.md#get_eth1_data proc getBlockProposalData*(m: Eth1Monitor, state: BeaconState, - finalizedEth1Data: Eth1Data): (Eth1Data, seq[Deposit]) = + finalizedEth1Data: Eth1Data): BlockProposalEth1Data = var pendingDepositsCount = state.eth1_data.deposit_count - state.eth1_deposit_index @@ -347,26 +359,30 @@ proc getBlockProposalData*(m: Eth1Monitor, let periodStart = voting_period_start_time(state) - var otherVotesCountTable = initCountTable[Eth1Block]() + var otherVotesCountTable = initCountTable[Eth1Data]() for vote in state.eth1_data_votes: + # TODO(zah) + # There is a slight deviation from the spec here to deal with the following + # problem: the in-memory database of eth1 blocks for a restarted node will + # be empty which will lead a "no change" vote. To fix this, we'll need to + # add rolling persistance for all potentially voted on blocks. let eth1Block = m.eth1Chain.findBlock(vote) - if eth1Block != nil and - is_candidate_block(m.preset, eth1Block, periodStart) and - eth1Block.voteData.deposit_count > state.eth1_data.deposit_count: - otherVotesCountTable.inc eth1Block + if (eth1Block == nil or is_candidate_block(m.preset, eth1Block, periodStart)) and + vote.deposit_count > state.eth1_data.deposit_count: + otherVotesCountTable.inc vote if otherVotesCountTable.len > 0: - let (winningBlock, votes) = otherVotesCountTable.largest - result[0] = winningBlock.voteData + let (winningVote, votes) = otherVotesCountTable.largest + result.vote = winningVote if uint64((votes + 1) * 2) > SLOTS_PER_ETH1_VOTING_PERIOD: - pendingDepositsCount = winningBlock.voteData.deposit_count - + pendingDepositsCount = winningVote.deposit_count - state.eth1_deposit_index else: let latestBlock = m.eth1Chain.latestCandidateBlock(m.preset, periodStart) if latestBlock == nil: - result[0] = state.eth1_data + result.vote = state.eth1_data else: - result[0] = latestBlock.voteData + result.vote = latestBlock.voteData if pendingDepositsCount > 0 and hasLatestDeposits: let totalDepositsInNewBlock = min(MAX_DEPOSITS, pendingDepositsCount) @@ -389,7 +405,7 @@ proc getBlockProposalData*(m: Eth1Monitor, deposits[i].proof[32].data[0..7] = toBytesLE uint64(state.eth1_deposit_index + i + 1) - swap(result[1], deposits) + swap(result.deposits, deposits) proc new(T: type Web3DataProvider, depositContractAddress: Eth1Address, @@ -545,7 +561,17 @@ proc safeCancel(fut: var Future[void]) = proc stop*(m: Eth1Monitor) = safeCancel m.runFut -proc syncBlockRange(m: Eth1Monitor, fromBlock, toBlock: Eth1BlockNumber) {.async.} = +const + votedBlocksSafetyMargin = 20 + +proc earliestBlockOfInterest(m: Eth1Monitor): Eth1BlockNumber = + m.latestEth1BlockNumber - (2 * m.preset.ETH1_FOLLOW_DISTANCE) - votedBlocksSafetyMargin + +proc syncBlockRange(m: Eth1Monitor, + fromBlock, toBlock, + fullSyncFromBlock: Eth1BlockNumber) {.async.} = + doAssert m.eth1Chain.blocks.len > 0 + var currentBlock = fromBlock while currentBlock <= toBlock: var @@ -588,6 +614,13 @@ proc syncBlockRange(m: Eth1Monitor, fromBlock, toBlock: Eth1BlockNumber) {.async blk.activeValidatorsCount = m.db.immutableValidatorData.lenu64 + if blk.number > fullSyncFromBlock: + let lastBlock = m.eth1Chain.blocks.peekLast + for n in max(lastBlock.number + 1, fullSyncFromBlock) ..< blk.number: + let blockWithoutDeposits = await m.dataProvider.getBlockByNumber(n) + m.eth1Chain.addBlock( + lastBlock.makeSuccessorWithoutDeposits(blockWithoutDeposits)) + m.eth1Chain.addBlock blk if eth1Blocks.len > 0: @@ -596,10 +629,14 @@ proc syncBlockRange(m: Eth1Monitor, fromBlock, toBlock: Eth1BlockNumber) {.async when hasDepositRootChecks: let status = await m.dataProvider.fetchDepositContractData(lastBlock) - debug "Deposit root checks", status, + debug "Deposit contract state verified", status, ourCount = lastBlock.voteData.deposit_count, ourRoot = lastBlock.voteData.deposit_root + if status in {DepositRootIncorrect, DepositCountIncorrect}: + raise newException(CorruptDataProvider, + "The deposit log events disagree with the deposit contract state") + m.db.putEth1PersistedTo lastBlock.voteData notice "Eth1 sync progress", @@ -618,14 +655,7 @@ proc syncBlockRange(m: Eth1Monitor, fromBlock, toBlock: Eth1BlockNumber) {.async ts = web3Block.timestamp.uint64, number = web3Block.number.uint64 - m.eth1Chain.addBlock Eth1Block( - number: Eth1BlockNumber web3Block.number, - timestamp: Eth1BlockTimestamp web3Block.timestamp, - voteData: Eth1Data( - block_hash: web3Block.hash.asEth2Digest, - deposit_count: lastBlock.voteData.deposit_count, - deposit_root: lastBlock.voteData.deposit_root), - activeValidatorsCount: lastBlock.activeValidatorsCount) + m.eth1Chain.addBlock lastBlock.makeSuccessorWithoutDeposits(web3Block) else: await m.dataProvider.fetchTimestamp(lastBlock) @@ -671,8 +701,18 @@ proc handleEth1Progress(m: Eth1Monitor) {.async.} = # it could easily re-order the steps due to the interruptible # interleaved execution of async code. - var eth1SyncedTo = await m.dataProvider.getBlockNumber( - m.eth1Chain.knownStart.block_hash.asBlockHash) + let startBlock = awaitWithTimeout( + m.dataProvider.getBlockByHash(m.eth1Chain.knownStart.block_hash.asBlockHash), + web3Timeouts): + error "Eth1 sync failed to obtain information about the starting block in time" + return + + m.eth1Chain.addBlock Eth1Block( + number: Eth1BlockNumber startBlock.number, + timestamp: Eth1BlockTimestamp startBlock.timestamp, + voteData: m.eth1Chain.knownStart) + + var eth1SyncedTo = Eth1BlockNumber startBlock.number while true: if bnStatus == BeaconNodeStatus.Stopping: @@ -692,18 +732,17 @@ proc handleEth1Progress(m: Eth1Monitor) {.async.} = if targetBlock <= eth1SyncedTo: continue - await m.syncBlockRange(eth1SyncedTo + 1, targetBlock) + let earliestBlockOfInterest = m.earliestBlockOfInterest() + await m.syncBlockRange(eth1SyncedTo + 1, targetBlock, earliestBlockOfInterest) eth1SyncedTo = targetBlock + # We'll clean old blocks that can no longer be voting candidates. + # Technically, we should check that the block is outside of the current + # voting period as determined by its timestamp, but we'll approximate + # this by requiring a much larger difference in block numbers: while m.eth1Chain.blocks.len > 1: - # We'll clean old blocks that can no longer be voting candidates. - # Technically, we should check that the block is outside of the current - # voting period as determined by its timestamp, but we'll approximate - # this by requiring a much larger difference in block numbers. - # (i.e. twice the follow distance). let earliestBlock = m.eth1Chain.blocks.peekFirst - if earliestBlock.number < targetBlock and - targetBlock - earliestBlock.number < m.preset.ETH1_FOLLOW_DISTANCE * 2: + if earliestBlock.number > earliestBlockOfInterest: break m.eth1Chain.popFirst() @@ -765,11 +804,13 @@ proc start(m: Eth1Monitor, delayBeforeStart: Duration) = if runFut == m.runFut: error "Eth1 chain monitoring failure, restarting", err = runFut.error.msg m.stop() - m.start(5.seconds) else: fatal "Fatal exception reached", err = runFut.error.msg quit 1 + m.runFut = nil + m.start(5.seconds) + proc start*(m: Eth1Monitor) = m.start(0.seconds) diff --git a/beacon_chain/rpc/debug_api.nim b/beacon_chain/rpc/debug_api.nim index 1eb539897..6e010257b 100644 --- a/beacon_chain/rpc/debug_api.nim +++ b/beacon_chain/rpc/debug_api.nim @@ -1,8 +1,9 @@ import - std/sequtils, + std/[sequtils, deques], json_rpc/[rpcserver, jsonmarshal], chronicles, - ../beacon_node_common, ../eth2_json_rpc_serialization, ../eth2_network, ../peer_pool, ../version, + ../version, ../beacon_node_common, ../eth2_json_rpc_serialization, + ../eth1_monitor, ../validator_duties, ../eth2_network, ../peer_pool, ../spec/[datatypes, digest, presets], ./rpc_utils @@ -10,6 +11,7 @@ logScope: topics = "debugapi" type RpcServer = RpcHttpServer + Eth1Block = eth1_monitor.Eth1Block proc installDebugApiHandlers*(rpcServer: RpcServer, node: BeaconNode) = rpcServer.rpc("get_v1_debug_beacon_states_stateId") do ( @@ -20,3 +22,15 @@ proc installDebugApiHandlers*(rpcServer: RpcServer, node: BeaconNode) = rpcServer.rpc("get_v1_debug_beacon_heads") do ( stateId: string) -> seq[tuple[root: Eth2Digest, slot: Slot]]: return node.chainDag.heads.mapIt((it.root, it.slot)) + + rpcServer.rpc("get_v1_debug_eth1_chain") do () -> seq[Eth1Block]: + return mapIt(node.eth1Monitor.blocks, it) + + rpcServer.rpc("get_v1_debug_eth1_proposal_data") do () -> BlockProposalEth1Data: + let + wallSlot = node.beaconClock.now.slotOrZero + head = node.doChecksAndGetCurrentHead(wallSlot) + + node.chainDag.withState(node.chainDag.tmpState, head.atSlot(wallSlot)): + return node.getBlockProposalEth1Data(state) + diff --git a/beacon_chain/spec/datatypes.nim b/beacon_chain/spec/datatypes.nim index 6c39ba9ed..e2ba2f195 100644 --- a/beacon_chain/spec/datatypes.nim +++ b/beacon_chain/spec/datatypes.nim @@ -667,6 +667,8 @@ func shortLog*(v: SomeBeaconBlock): auto = proposer_index: v.proposer_index, parent_root: shortLog(v.parent_root), state_root: shortLog(v.state_root), + eth1data: v.body.eth1_data, + graffiti: $v.body.graffiti, proposer_slashings_len: v.body.proposer_slashings.len(), attester_slashings_len: v.body.attester_slashings.len(), attestations_len: v.body.attestations.len(), diff --git a/beacon_chain/validator_duties.nim b/beacon_chain/validator_duties.nim index fcc5d4074..b4d0efad5 100644 --- a/beacon_chain/validator_duties.nim +++ b/beacon_chain/validator_duties.nim @@ -182,6 +182,14 @@ proc createAndSendAttestation(node: BeaconNode, beacon_attestation_sent_delay.observe(delayMillis) +proc getBlockProposalEth1Data*(node: BeaconNode, + state: BeaconState): BlockProposalEth1Data = + if node.eth1Monitor.isNil: + BlockProposalEth1Data(vote: state.eth1_data) + else: + let finalizedEth1Data = node.chainDag.getFinalizedEpochRef().eth1_data + node.eth1Monitor.getBlockProposalData(state, finalizedEth1Data) + proc makeBeaconBlockForHeadAndSlot*(node: BeaconNode, randao_reveal: ValidatorSig, validator_index: ValidatorIndex, @@ -190,14 +198,8 @@ proc makeBeaconBlockForHeadAndSlot*(node: BeaconNode, slot: Slot): Option[BeaconBlock] = # Advance state to the slot that we're proposing for node.chainDag.withState(node.chainDag.tmpState, head.atSlot(slot)): - let (eth1data, deposits) = - if node.eth1Monitor.isNil: - (state.eth1_data, newSeq[Deposit]()) - else: - let finalizedEth1Data = node.chainDag.getFinalizedEpochRef().eth1_data - node.eth1Monitor.getBlockProposalData(state, finalizedEth1Data) - let + eth1Proposal = node.getBlockProposalEth1Data(state) poolPtr = unsafeAddr node.chainDag # safe because restore is short-lived func restore(v: var HashedBeaconState) = @@ -213,10 +215,10 @@ proc makeBeaconBlockForHeadAndSlot*(node: BeaconNode, validator_index, head.root, randao_reveal, - eth1data, + eth1Proposal.vote, graffiti, node.attestationPool[].getAttestationsForBlock(state, cache), - deposits, + eth1Proposal.deposits, node.exitPool[].getProposerSlashingsForBlock(), node.exitPool[].getAttesterSlashingsForBlock(), node.exitPool[].getVoluntaryExitsForBlock(),