Tentative fix for the Pyrmont Eth1 voting issue

This commit is contained in:
Zahary Karadjov 2020-11-19 19:19:03 +02:00 committed by zah
parent 3c4cf95d0e
commit e22248bca1
4 changed files with 117 additions and 58 deletions

View File

@ -8,6 +8,9 @@ import
export export
web3Types web3Types
logScope:
topics = "eth1"
contract(DepositContract): contract(DepositContract):
proc deposit(pubkey: Bytes48, proc deposit(pubkey: Bytes48,
withdrawalCredentials: Bytes32, withdrawalCredentials: Bytes32,
@ -70,7 +73,6 @@ type
Web3DataProviderRef* = ref Web3DataProvider Web3DataProviderRef* = ref Web3DataProvider
ReorgDepthLimitExceeded = object of CatchableError
CorruptDataProvider = object of CatchableError CorruptDataProvider = object of CatchableError
DisconnectHandler* = proc () {.gcsafe, raises: [Defect].} DisconnectHandler* = proc () {.gcsafe, raises: [Defect].}
@ -81,9 +83,13 @@ type
amount: Bytes8, amount: Bytes8,
signature: Bytes96, merkleTreeIndex: Bytes8, j: JsonNode) {.raises: [Defect], gcsafe.} signature: Bytes96, merkleTreeIndex: Bytes8, j: JsonNode) {.raises: [Defect], gcsafe.}
BlockProposalEth1Data* = object
vote*: Eth1Data
deposits*: seq[Deposit]
const const
web3Timeouts = 5.seconds web3Timeouts = 5.seconds
hasDepositRootChecks = defined(with_deposit_root_checks) hasDepositRootChecks = true # defined(with_deposit_root_checks)
template depositContractAddress(m: Eth1Monitor): Eth1Address = template depositContractAddress(m: Eth1Monitor): Eth1Address =
m.dataProvider.ns.contractAddress m.dataProvider.ns.contractAddress
@ -91,6 +97,9 @@ template depositContractAddress(m: Eth1Monitor): Eth1Address =
template web3Url(m: Eth1Monitor): string = template web3Url(m: Eth1Monitor): string =
m.dataProvider.url m.dataProvider.url
template blocks*(m: Eth1Monitor): Deque[Eth1Block] =
m.eth1Chain.blocks
proc fixupWeb3Urls*(web3Url: var string) = proc fixupWeb3Urls*(web3Url: var string) =
## Converts HTTP and HTTPS Infura URLs to their WebSocket equivalents ## Converts HTTP and HTTPS Infura URLs to their WebSocket equivalents
## because we are missing a functional HTTPS client. ## because we are missing a functional HTTPS client.
@ -164,6 +173,17 @@ template findBlock*(eth1Chain: Eth1Chain, hash: BlockHash): Eth1Block =
template findBlock*(eth1Chain: Eth1Chain, eth1Data: Eth1Data): Eth1Block = template findBlock*(eth1Chain: Eth1Chain, eth1Data: Eth1Data): Eth1Block =
getOrDefault(eth1Chain.blocksByHash, asBlockHash(eth1Data.block_hash), nil) getOrDefault(eth1Chain.blocksByHash, asBlockHash(eth1Data.block_hash), nil)
func makeSuccessorWithoutDeposits(existingBlock: Eth1Block,
successor: BlockObject): ETh1Block =
Eth1Block(
number: Eth1BlockNumber successor.number,
timestamp: Eth1BlockTimestamp successor.timestamp,
voteData: Eth1Data(
block_hash: successor.hash.asEth2Digest,
deposit_count: existingBlock.voteData.deposit_count,
deposit_root: existingBlock.voteData.deposit_root),
activeValidatorsCount: existingBlock.activeValidatorsCount)
func latestCandidateBlock(eth1Chain: Eth1Chain, func latestCandidateBlock(eth1Chain: Eth1Chain,
preset: RuntimePreset, preset: RuntimePreset,
periodStart: uint64): Eth1Block = periodStart: uint64): Eth1Block =
@ -180,8 +200,11 @@ func addBlock(eth1Chain: var Eth1Chain, newBlock: Eth1Block) =
eth1Chain.blocks.addLast newBlock eth1Chain.blocks.addLast newBlock
eth1Chain.blocksByHash[newBlock.voteData.block_hash.asBlockHash] = newBlock eth1Chain.blocksByHash[newBlock.voteData.block_hash.asBlockHash] = newBlock
template hash*(x: Eth1Data): Hash =
hash(x.block_hash.data)
template hash*(x: Eth1Block): Hash = template hash*(x: Eth1Block): Hash =
hash(x.voteData.block_hash.data) hash(x.voteData)
proc close*(p: Web3DataProviderRef): Future[void] {.async.} = proc close*(p: Web3DataProviderRef): Future[void] {.async.} =
if p.blockHeadersSubscription != nil: if p.blockHeadersSubscription != nil:
@ -197,17 +220,6 @@ proc getBlockByNumber*(p: Web3DataProviderRef,
number: Eth1BlockNumber): Future[BlockObject] = number: Eth1BlockNumber): Future[BlockObject] =
return p.web3.provider.eth_getBlockByNumber(&"0x{number:X}", false) return p.web3.provider.eth_getBlockByNumber(&"0x{number:X}", false)
proc getBlockNumber(p: Web3DataProviderRef, hash: BlockHash):
Future[Eth1BlockNumber] {.async.} =
try:
let blk = awaitWithTimeout(p.getBlockByHash(hash), web3Timeouts):
return 0
return Eth1BlockNumber(blk.number)
except CatchableError as exc:
debug "Failed to get Eth1 block number from hash",
hash = $hash, err = exc.msg
raise exc
template readJsonField(j: JsonNode, fieldName: string, ValueType: type): untyped = template readJsonField(j: JsonNode, fieldName: string, ValueType: type): untyped =
var res: ValueType var res: ValueType
fromJson(j[fieldName], fieldName, res) fromJson(j[fieldName], fieldName, res)
@ -318,7 +330,7 @@ func getDepositsRoot(m: DepositsMerkleizer): Eth2Digest =
# https://github.com/ethereum/eth2.0-specs/blob/v1.0.0/specs/phase0/validator.md#get_eth1_data # https://github.com/ethereum/eth2.0-specs/blob/v1.0.0/specs/phase0/validator.md#get_eth1_data
proc getBlockProposalData*(m: Eth1Monitor, proc getBlockProposalData*(m: Eth1Monitor,
state: BeaconState, state: BeaconState,
finalizedEth1Data: Eth1Data): (Eth1Data, seq[Deposit]) = finalizedEth1Data: Eth1Data): BlockProposalEth1Data =
var pendingDepositsCount = state.eth1_data.deposit_count - var pendingDepositsCount = state.eth1_data.deposit_count -
state.eth1_deposit_index state.eth1_deposit_index
@ -347,26 +359,30 @@ proc getBlockProposalData*(m: Eth1Monitor,
let periodStart = voting_period_start_time(state) let periodStart = voting_period_start_time(state)
var otherVotesCountTable = initCountTable[Eth1Block]() var otherVotesCountTable = initCountTable[Eth1Data]()
for vote in state.eth1_data_votes: for vote in state.eth1_data_votes:
# TODO(zah)
# There is a slight deviation from the spec here to deal with the following
# problem: the in-memory database of eth1 blocks for a restarted node will
# be empty which will lead a "no change" vote. To fix this, we'll need to
# add rolling persistance for all potentially voted on blocks.
let eth1Block = m.eth1Chain.findBlock(vote) let eth1Block = m.eth1Chain.findBlock(vote)
if eth1Block != nil and if (eth1Block == nil or is_candidate_block(m.preset, eth1Block, periodStart)) and
is_candidate_block(m.preset, eth1Block, periodStart) and vote.deposit_count > state.eth1_data.deposit_count:
eth1Block.voteData.deposit_count > state.eth1_data.deposit_count: otherVotesCountTable.inc vote
otherVotesCountTable.inc eth1Block
if otherVotesCountTable.len > 0: if otherVotesCountTable.len > 0:
let (winningBlock, votes) = otherVotesCountTable.largest let (winningVote, votes) = otherVotesCountTable.largest
result[0] = winningBlock.voteData result.vote = winningVote
if uint64((votes + 1) * 2) > SLOTS_PER_ETH1_VOTING_PERIOD: if uint64((votes + 1) * 2) > SLOTS_PER_ETH1_VOTING_PERIOD:
pendingDepositsCount = winningBlock.voteData.deposit_count - pendingDepositsCount = winningVote.deposit_count -
state.eth1_deposit_index state.eth1_deposit_index
else: else:
let latestBlock = m.eth1Chain.latestCandidateBlock(m.preset, periodStart) let latestBlock = m.eth1Chain.latestCandidateBlock(m.preset, periodStart)
if latestBlock == nil: if latestBlock == nil:
result[0] = state.eth1_data result.vote = state.eth1_data
else: else:
result[0] = latestBlock.voteData result.vote = latestBlock.voteData
if pendingDepositsCount > 0 and hasLatestDeposits: if pendingDepositsCount > 0 and hasLatestDeposits:
let totalDepositsInNewBlock = min(MAX_DEPOSITS, pendingDepositsCount) let totalDepositsInNewBlock = min(MAX_DEPOSITS, pendingDepositsCount)
@ -389,7 +405,7 @@ proc getBlockProposalData*(m: Eth1Monitor,
deposits[i].proof[32].data[0..7] = deposits[i].proof[32].data[0..7] =
toBytesLE uint64(state.eth1_deposit_index + i + 1) toBytesLE uint64(state.eth1_deposit_index + i + 1)
swap(result[1], deposits) swap(result.deposits, deposits)
proc new(T: type Web3DataProvider, proc new(T: type Web3DataProvider,
depositContractAddress: Eth1Address, depositContractAddress: Eth1Address,
@ -545,7 +561,17 @@ proc safeCancel(fut: var Future[void]) =
proc stop*(m: Eth1Monitor) = proc stop*(m: Eth1Monitor) =
safeCancel m.runFut safeCancel m.runFut
proc syncBlockRange(m: Eth1Monitor, fromBlock, toBlock: Eth1BlockNumber) {.async.} = const
votedBlocksSafetyMargin = 20
proc earliestBlockOfInterest(m: Eth1Monitor): Eth1BlockNumber =
m.latestEth1BlockNumber - (2 * m.preset.ETH1_FOLLOW_DISTANCE) - votedBlocksSafetyMargin
proc syncBlockRange(m: Eth1Monitor,
fromBlock, toBlock,
fullSyncFromBlock: Eth1BlockNumber) {.async.} =
doAssert m.eth1Chain.blocks.len > 0
var currentBlock = fromBlock var currentBlock = fromBlock
while currentBlock <= toBlock: while currentBlock <= toBlock:
var var
@ -588,6 +614,13 @@ proc syncBlockRange(m: Eth1Monitor, fromBlock, toBlock: Eth1BlockNumber) {.async
blk.activeValidatorsCount = m.db.immutableValidatorData.lenu64 blk.activeValidatorsCount = m.db.immutableValidatorData.lenu64
if blk.number > fullSyncFromBlock:
let lastBlock = m.eth1Chain.blocks.peekLast
for n in max(lastBlock.number + 1, fullSyncFromBlock) ..< blk.number:
let blockWithoutDeposits = await m.dataProvider.getBlockByNumber(n)
m.eth1Chain.addBlock(
lastBlock.makeSuccessorWithoutDeposits(blockWithoutDeposits))
m.eth1Chain.addBlock blk m.eth1Chain.addBlock blk
if eth1Blocks.len > 0: if eth1Blocks.len > 0:
@ -596,10 +629,14 @@ proc syncBlockRange(m: Eth1Monitor, fromBlock, toBlock: Eth1BlockNumber) {.async
when hasDepositRootChecks: when hasDepositRootChecks:
let status = await m.dataProvider.fetchDepositContractData(lastBlock) let status = await m.dataProvider.fetchDepositContractData(lastBlock)
debug "Deposit root checks", status, debug "Deposit contract state verified", status,
ourCount = lastBlock.voteData.deposit_count, ourCount = lastBlock.voteData.deposit_count,
ourRoot = lastBlock.voteData.deposit_root ourRoot = lastBlock.voteData.deposit_root
if status in {DepositRootIncorrect, DepositCountIncorrect}:
raise newException(CorruptDataProvider,
"The deposit log events disagree with the deposit contract state")
m.db.putEth1PersistedTo lastBlock.voteData m.db.putEth1PersistedTo lastBlock.voteData
notice "Eth1 sync progress", notice "Eth1 sync progress",
@ -618,14 +655,7 @@ proc syncBlockRange(m: Eth1Monitor, fromBlock, toBlock: Eth1BlockNumber) {.async
ts = web3Block.timestamp.uint64, ts = web3Block.timestamp.uint64,
number = web3Block.number.uint64 number = web3Block.number.uint64
m.eth1Chain.addBlock Eth1Block( m.eth1Chain.addBlock lastBlock.makeSuccessorWithoutDeposits(web3Block)
number: Eth1BlockNumber web3Block.number,
timestamp: Eth1BlockTimestamp web3Block.timestamp,
voteData: Eth1Data(
block_hash: web3Block.hash.asEth2Digest,
deposit_count: lastBlock.voteData.deposit_count,
deposit_root: lastBlock.voteData.deposit_root),
activeValidatorsCount: lastBlock.activeValidatorsCount)
else: else:
await m.dataProvider.fetchTimestamp(lastBlock) await m.dataProvider.fetchTimestamp(lastBlock)
@ -671,8 +701,18 @@ proc handleEth1Progress(m: Eth1Monitor) {.async.} =
# it could easily re-order the steps due to the interruptible # it could easily re-order the steps due to the interruptible
# interleaved execution of async code. # interleaved execution of async code.
var eth1SyncedTo = await m.dataProvider.getBlockNumber( let startBlock = awaitWithTimeout(
m.eth1Chain.knownStart.block_hash.asBlockHash) m.dataProvider.getBlockByHash(m.eth1Chain.knownStart.block_hash.asBlockHash),
web3Timeouts):
error "Eth1 sync failed to obtain information about the starting block in time"
return
m.eth1Chain.addBlock Eth1Block(
number: Eth1BlockNumber startBlock.number,
timestamp: Eth1BlockTimestamp startBlock.timestamp,
voteData: m.eth1Chain.knownStart)
var eth1SyncedTo = Eth1BlockNumber startBlock.number
while true: while true:
if bnStatus == BeaconNodeStatus.Stopping: if bnStatus == BeaconNodeStatus.Stopping:
@ -692,18 +732,17 @@ proc handleEth1Progress(m: Eth1Monitor) {.async.} =
if targetBlock <= eth1SyncedTo: if targetBlock <= eth1SyncedTo:
continue continue
await m.syncBlockRange(eth1SyncedTo + 1, targetBlock) let earliestBlockOfInterest = m.earliestBlockOfInterest()
await m.syncBlockRange(eth1SyncedTo + 1, targetBlock, earliestBlockOfInterest)
eth1SyncedTo = targetBlock eth1SyncedTo = targetBlock
# We'll clean old blocks that can no longer be voting candidates.
# Technically, we should check that the block is outside of the current
# voting period as determined by its timestamp, but we'll approximate
# this by requiring a much larger difference in block numbers:
while m.eth1Chain.blocks.len > 1: while m.eth1Chain.blocks.len > 1:
# We'll clean old blocks that can no longer be voting candidates.
# Technically, we should check that the block is outside of the current
# voting period as determined by its timestamp, but we'll approximate
# this by requiring a much larger difference in block numbers.
# (i.e. twice the follow distance).
let earliestBlock = m.eth1Chain.blocks.peekFirst let earliestBlock = m.eth1Chain.blocks.peekFirst
if earliestBlock.number < targetBlock and if earliestBlock.number > earliestBlockOfInterest:
targetBlock - earliestBlock.number < m.preset.ETH1_FOLLOW_DISTANCE * 2:
break break
m.eth1Chain.popFirst() m.eth1Chain.popFirst()
@ -765,11 +804,13 @@ proc start(m: Eth1Monitor, delayBeforeStart: Duration) =
if runFut == m.runFut: if runFut == m.runFut:
error "Eth1 chain monitoring failure, restarting", err = runFut.error.msg error "Eth1 chain monitoring failure, restarting", err = runFut.error.msg
m.stop() m.stop()
m.start(5.seconds)
else: else:
fatal "Fatal exception reached", err = runFut.error.msg fatal "Fatal exception reached", err = runFut.error.msg
quit 1 quit 1
m.runFut = nil
m.start(5.seconds)
proc start*(m: Eth1Monitor) = proc start*(m: Eth1Monitor) =
m.start(0.seconds) m.start(0.seconds)

View File

@ -1,8 +1,9 @@
import import
std/sequtils, std/[sequtils, deques],
json_rpc/[rpcserver, jsonmarshal], json_rpc/[rpcserver, jsonmarshal],
chronicles, chronicles,
../beacon_node_common, ../eth2_json_rpc_serialization, ../eth2_network, ../peer_pool, ../version, ../version, ../beacon_node_common, ../eth2_json_rpc_serialization,
../eth1_monitor, ../validator_duties, ../eth2_network, ../peer_pool,
../spec/[datatypes, digest, presets], ../spec/[datatypes, digest, presets],
./rpc_utils ./rpc_utils
@ -10,6 +11,7 @@ logScope: topics = "debugapi"
type type
RpcServer = RpcHttpServer RpcServer = RpcHttpServer
Eth1Block = eth1_monitor.Eth1Block
proc installDebugApiHandlers*(rpcServer: RpcServer, node: BeaconNode) = proc installDebugApiHandlers*(rpcServer: RpcServer, node: BeaconNode) =
rpcServer.rpc("get_v1_debug_beacon_states_stateId") do ( rpcServer.rpc("get_v1_debug_beacon_states_stateId") do (
@ -20,3 +22,15 @@ proc installDebugApiHandlers*(rpcServer: RpcServer, node: BeaconNode) =
rpcServer.rpc("get_v1_debug_beacon_heads") do ( rpcServer.rpc("get_v1_debug_beacon_heads") do (
stateId: string) -> seq[tuple[root: Eth2Digest, slot: Slot]]: stateId: string) -> seq[tuple[root: Eth2Digest, slot: Slot]]:
return node.chainDag.heads.mapIt((it.root, it.slot)) return node.chainDag.heads.mapIt((it.root, it.slot))
rpcServer.rpc("get_v1_debug_eth1_chain") do () -> seq[Eth1Block]:
return mapIt(node.eth1Monitor.blocks, it)
rpcServer.rpc("get_v1_debug_eth1_proposal_data") do () -> BlockProposalEth1Data:
let
wallSlot = node.beaconClock.now.slotOrZero
head = node.doChecksAndGetCurrentHead(wallSlot)
node.chainDag.withState(node.chainDag.tmpState, head.atSlot(wallSlot)):
return node.getBlockProposalEth1Data(state)

View File

@ -667,6 +667,8 @@ func shortLog*(v: SomeBeaconBlock): auto =
proposer_index: v.proposer_index, proposer_index: v.proposer_index,
parent_root: shortLog(v.parent_root), parent_root: shortLog(v.parent_root),
state_root: shortLog(v.state_root), state_root: shortLog(v.state_root),
eth1data: v.body.eth1_data,
graffiti: $v.body.graffiti,
proposer_slashings_len: v.body.proposer_slashings.len(), proposer_slashings_len: v.body.proposer_slashings.len(),
attester_slashings_len: v.body.attester_slashings.len(), attester_slashings_len: v.body.attester_slashings.len(),
attestations_len: v.body.attestations.len(), attestations_len: v.body.attestations.len(),

View File

@ -182,6 +182,14 @@ proc createAndSendAttestation(node: BeaconNode,
beacon_attestation_sent_delay.observe(delayMillis) beacon_attestation_sent_delay.observe(delayMillis)
proc getBlockProposalEth1Data*(node: BeaconNode,
state: BeaconState): BlockProposalEth1Data =
if node.eth1Monitor.isNil:
BlockProposalEth1Data(vote: state.eth1_data)
else:
let finalizedEth1Data = node.chainDag.getFinalizedEpochRef().eth1_data
node.eth1Monitor.getBlockProposalData(state, finalizedEth1Data)
proc makeBeaconBlockForHeadAndSlot*(node: BeaconNode, proc makeBeaconBlockForHeadAndSlot*(node: BeaconNode,
randao_reveal: ValidatorSig, randao_reveal: ValidatorSig,
validator_index: ValidatorIndex, validator_index: ValidatorIndex,
@ -190,14 +198,8 @@ proc makeBeaconBlockForHeadAndSlot*(node: BeaconNode,
slot: Slot): Option[BeaconBlock] = slot: Slot): Option[BeaconBlock] =
# Advance state to the slot that we're proposing for # Advance state to the slot that we're proposing for
node.chainDag.withState(node.chainDag.tmpState, head.atSlot(slot)): node.chainDag.withState(node.chainDag.tmpState, head.atSlot(slot)):
let (eth1data, deposits) =
if node.eth1Monitor.isNil:
(state.eth1_data, newSeq[Deposit]())
else:
let finalizedEth1Data = node.chainDag.getFinalizedEpochRef().eth1_data
node.eth1Monitor.getBlockProposalData(state, finalizedEth1Data)
let let
eth1Proposal = node.getBlockProposalEth1Data(state)
poolPtr = unsafeAddr node.chainDag # safe because restore is short-lived poolPtr = unsafeAddr node.chainDag # safe because restore is short-lived
func restore(v: var HashedBeaconState) = func restore(v: var HashedBeaconState) =
@ -213,10 +215,10 @@ proc makeBeaconBlockForHeadAndSlot*(node: BeaconNode,
validator_index, validator_index,
head.root, head.root,
randao_reveal, randao_reveal,
eth1data, eth1Proposal.vote,
graffiti, graffiti,
node.attestationPool[].getAttestationsForBlock(state, cache), node.attestationPool[].getAttestationsForBlock(state, cache),
deposits, eth1Proposal.deposits,
node.exitPool[].getProposerSlashingsForBlock(), node.exitPool[].getProposerSlashingsForBlock(),
node.exitPool[].getAttesterSlashingsForBlock(), node.exitPool[].getAttesterSlashingsForBlock(),
node.exitPool[].getVoluntaryExitsForBlock(), node.exitPool[].getVoluntaryExitsForBlock(),