From 31e31bb30c71193300d4f94c3b2feb5b936a90aa Mon Sep 17 00:00:00 2001 From: Zahary Karadjov Date: Sat, 27 Jun 2020 15:01:19 +0300 Subject: [PATCH] Switch to monitoring of Blocks; More accurate genesis detection and faster Eth1 syncing --- beacon_chain/beacon_node.nim | 33 +- beacon_chain/mainchain_monitor.nim | 678 ++++++++++++++++------------- beacon_chain/merkle_minimal.nim | 2 +- beacon_chain/spec/beaconstate.nim | 27 +- vendor/nim-web3 | 2 +- 5 files changed, 412 insertions(+), 330 deletions(-) diff --git a/beacon_chain/beacon_node.nim b/beacon_chain/beacon_node.nim index 24c815016..5fb7463ab 100644 --- a/beacon_chain/beacon_node.nim +++ b/beacon_chain/beacon_node.nim @@ -145,19 +145,27 @@ proc init*(T: type BeaconNode, conf: BeaconNodeConf): Future[BeaconNode] {.async # Try file from command line first if genesisState.isNil: - # Didn't work, try creating a genesis state using main chain monitor + if conf.web3Url.len == 0: + fatal "Web3 URL not specified" + quit 1 + + if conf.depositContractAddress.len == 0: + fatal "Deposit contract address not specified" + quit 1 + + if conf.depositContractDeployedAt.isNone: + # When we don't have a known genesis state, the network metadata + # must specify the deployment block of the contract. + fatal "Deposit contract deployment block not specified" + quit 1 + # TODO Could move this to a separate "GenesisMonitor" process or task # that would do only this - see Paul's proposal for this. - if conf.web3Url.len > 0 and conf.depositContractAddress.len > 0: - mainchainMonitor = MainchainMonitor.init( - web3Provider(conf.web3Url), - conf.depositContractAddress, - conf.depositContractDeployedAt, - FromContractDeploymentBlock) - mainchainMonitor.start() - else: - error "No initial state, need genesis state or deposit contract address" - quit 1 + mainchainMonitor = MainchainMonitor.init( + web3Provider(conf.web3Url), + conf.depositContractAddress, + Eth1Data(block_hash: conf.depositContractDeployedAt.get, deposit_count: 0)) + mainchainMonitor.start() genesisState = await mainchainMonitor.getGenesis() @@ -195,8 +203,7 @@ proc init*(T: type BeaconNode, conf: BeaconNodeConf): Future[BeaconNode] {.async mainchainMonitor = MainchainMonitor.init( web3Provider(conf.web3Url), conf.depositContractAddress, - some blockPool.headState.data.data.eth1_data.block_hash, - FromSnapshot) + blockPool.headState.data.data.eth1_data) # TODO if we don't have any validators attached, we don't need a mainchain # monitor mainchainMonitor.start() diff --git a/beacon_chain/mainchain_monitor.nim b/beacon_chain/mainchain_monitor.nim index ed74a3b76..d7b56e33f 100644 --- a/beacon_chain/mainchain_monitor.nim +++ b/beacon_chain/mainchain_monitor.nim @@ -4,6 +4,8 @@ import spec/[datatypes, digest, crypto, beaconstate, helpers], merkle_minimal +from times import epochTime + export ethtypes @@ -21,7 +23,6 @@ contract(DepositContract): amount: Bytes8, signature: Bytes96, index: Bytes8) {.event.} - # TODO # The raises list of this module are still not usable due to general # Exceptions being reported from Chronos's asyncfutures2. @@ -35,36 +36,29 @@ type timestamp*: Eth1BlockTimestamp deposits*: seq[Deposit] voteData*: Eth1Data - - StartKind* = enum - FromContractDeploymentBlock - FromSnapshot + knownGoodDepositsCount*: Option[uint64] Eth1Chain* = object + knownStart: Eth1Data + knownStartBlockNum: Option[Eth1BlockNumber] + blocks: Deque[Eth1Block] blocksByHash: Table[BlockHash, Eth1Block] - startKind: StartKind + allDeposits*: seq[Deposit] MainchainMonitor* = ref object depositContractAddress: Address - startBlock: Option[Eth2Digest] - dataProviderFactory*: DataProviderFactory genesisState: NilableBeaconStateRef genesisStateFut: Future[void] + genesisMonitoringFut: Future[void] eth1Chain: Eth1Chain - depositQueue: AsyncQueue[DepositQueueElem] + depositQueue: AsyncQueue[BlockHeader] runFut: Future[void] - Web3EventType = enum - NewEvent - RemovedEvent - - DepositQueueElem = (BlockHash, Web3EventType) - DataProvider* = object of RootObj DataProviderRef* = ref DataProvider @@ -79,7 +73,7 @@ type url: string web3: Web3 ns: Sender[DepositContract] - subscription: Subscription + blockHeadersSubscription: Subscription Web3DataProviderRef* = ref Web3DataProvider @@ -97,11 +91,20 @@ type const reorgDepthLimit = 1000 web3Timeouts = 5.seconds + followDistanceInSeconds = uint64(SECONDS_PER_ETH1_BLOCK * ETH1_FOLLOW_DISTANCE) + totalDepositsNeededForGenesis = uint64 max(SLOTS_PER_EPOCH, + MIN_GENESIS_ACTIVE_VALIDATOR_COUNT) # TODO Nim's analysis on the lock level of the methods in this # module seems broken. Investigate and file this as an issue. {.push warning[LockLevel]: off.} +static: + # https://github.com/ethereum/eth2.0-specs/blob/v0.12.1/specs/phase0/beacon-chain.md#genesis + when SPEC_VERSION == "0.12.1": + doAssert SECONDS_PER_ETH1_BLOCK * ETH1_FOLLOW_DISTANCE < GENESIS_DELAY, + "Invalid configuration: GENESIS_DELAY is set too low" + # https://github.com/ethereum/eth2.0-specs/blob/v0.12.1/specs/phase0/validator.md#get_eth1_data func compute_time_at_slot(state: BeaconState, slot: Slot): uint64 = state.genesis_time + slot * SECONDS_PER_SLOT @@ -123,6 +126,9 @@ func asEth2Digest*(x: BlockHash): Eth2Digest = template asBlockHash(x: Eth2Digest): BlockHash = BlockHash(x.data) +func shortLog(b: Eth1Block): string = + &"{b.number}:{shortLog b.voteData.block_hash}" + func getDepositsInRange(eth1Chain: Eth1Chain, sinceBlock, latestBlock: Eth1BlockNumber): seq[Deposit] = ## Returns all deposits that happened AFTER the block `sinceBlock` (not inclusive). @@ -164,41 +170,38 @@ func latestCandidateBlock(eth1Chain: Eth1Chain, periodStart: uint64): Eth1Block if is_candidate_block(blk, periodStart): return blk +func popBlock(eth1Chain: var Eth1Chain) = + let removed = eth1Chain.blocks.popLast + eth1Chain.blocksByHash.del removed.voteData.block_hash.asBlockHash + func trimHeight(eth1Chain: var Eth1Chain, blockNumber: Eth1BlockNumber) = ## Removes all blocks above certain `blockNumber` - if eth1Chain.blocks.len == 0: - return + while eth1Chain.blocks.len > 0: + if eth1Chain.blocks.peekLast.number > blockNumber: + eth1Chain.popBlock() + else: + break - let newLen = max(0, int(blockNumber - eth1Chain.blocks[0].number + 1)) - for i in newLen ..< eth1Chain.blocks.len: - let removed = eth1Chain.blocks.popLast - eth1Chain.blocksByHash.del removed.voteData.block_hash.asBlockHash - -template purgeChain*(eth1Chain: var Eth1Chain, blk: Eth1Block) = - ## This is used when we discover that a previously considered block - ## is no longer part of the selected chain (due to a reorg). We can - ## then remove from our chain together with all blocks that follow it. - trimHeight(eth1Chain, blk.number - 1) - -func purgeChain*(eth1Chain: var Eth1Chain, blockHash: BlockHash) = - let blk = eth1Chain.findBlock(blockHash) - if blk != nil: eth1Chain.purgeChain(blk) - -template purgeDescendants*(eth1CHain: Eth1Chain, blk: Eth1Block) = - trimHeight(eth1Chain, blk.number) + if eth1Chain.blocks.len > 0: + eth1Chain.allDeposits.setLen(eth1Chain.blocks[^1].voteData.deposit_count) + else: + eth1Chain.allDeposits.setLen(0) func isSuccessorBlock(eth1Chain: Eth1Chain, newBlock: Eth1Block): bool = - if eth1Chain.blocks.len == 0: - return eth1Chain.startKind == FromSnapshot or - newBlock.deposits.len.uint64 == newBlock.voteData.deposit_count + let currentDepositCount = if eth1Chain.blocks.len == 0: + eth1Chain.knownStart.deposit_count + else: + let lastBlock = eth1Chain.blocks.peekLast + if lastBlock.number >= newBlock.number: return false + lastBlock.voteData.deposit_count - let lastBlock = eth1Chain.blocks.peekLast - lastBlock.number < newBlock.number and - (lastBlock.voteData.deposit_count + newBlock.deposits.len.uint64) == newBlock.voteData.deposit_count + (currentDepositCount + newBlock.deposits.len.uint64) == newBlock.voteData.deposit_count func addSuccessorBlock*(eth1Chain: var Eth1Chain, newBlock: Eth1Block): bool = result = isSuccessorBlock(eth1Chain, newBlock) if result: + eth1Chain.allDeposits.add newBlock.deposits + reset newBlock.deposits eth1Chain.blocks.addLast newBlock eth1Chain.blocksByHash[newBlock.voteData.block_hash.asBlockHash] = newBlock @@ -227,6 +230,14 @@ method getBlockByHash*(p: DataProviderRef, hash: BlockHash): Future[BlockObject] .} = notImplemented +method getBlockByNumber*(p: DataProviderRef, hash: Eth1BlockNumber): Future[BlockObject] {. + base + gcsafe + locks: 0 + # raises: [Defect] +.} = + notImplemented + method onDisconnect*(p: DataProviderRef, handler: DisconnectHandler) {. base gcsafe @@ -235,9 +246,9 @@ method onDisconnect*(p: DataProviderRef, handler: DisconnectHandler) {. .} = notImplemented -method onDepositEvent*(p: DataProviderRef, - startBlock: Eth1BlockNumber, - handler: DepositEventHandler): Future[void] {. +method onBlockHeaders*(p: DataProviderRef, + blockHeaderHandler: BlockHeaderHandler, + errorHandler: SubscriptionErrorHandler): Future[void] {. base gcsafe locks: 0 @@ -253,8 +264,8 @@ method close*(p: DataProviderRef): Future[void] {. .} = notImplemented -method hasDepositContract*(p: DataProviderRef, - web3Block: BlockObject): Future[bool] {. +method fetchDepositData*(p: DataProviderRef, + fromBlock, toBlock: Eth1BlockNumber): Future[seq[Eth1Block]] {. base gcsafe locks: 0 @@ -262,8 +273,7 @@ method hasDepositContract*(p: DataProviderRef, .} = notImplemented -method fetchDepositData*(p: DataProviderRef, - web3Block: BlockObject): Future[Eth1Block] {. +method fetchBlockDetails(p: DataProviderRef, blk: Eth1Block): Future[void] {. base gcsafe locks: 0 @@ -271,6 +281,146 @@ method fetchDepositData*(p: DataProviderRef, .} = notImplemented +proc new*(T: type Web3DataProvider, + web3Url: string, + depositContractAddress: Address): Future[ref Web3DataProvider] {. + async + # raises: [Defect] +.} = + try: + type R = ref T + let + web3 = await newWeb3(web3Url) + ns = web3.contractSender(DepositContract, depositContractAddress) + return R(url: web3Url, web3: web3, ns: ns) + except CatchableError: + return nil + +func web3Provider*(web3Url: string): DataProviderFactory = + proc factory(depositContractAddress: Address): Future[DataProviderRef] {.async.} = + result = await Web3DataProvider.new(web3Url, depositContractAddress) + + DataProviderFactory(desc: "web3(" & web3Url & ")", new: factory) + +method close*(p: Web3DataProviderRef): Future[void] {.async, locks: 0.} = + if p.blockHeadersSubscription != nil: + await p.blockHeadersSubscription.unsubscribe() + + await p.web3.close() + +method getBlockByHash*(p: Web3DataProviderRef, hash: BlockHash): Future[BlockObject] = + return p.web3.provider.eth_getBlockByHash(hash, false) + +method getBlockByNumber*(p: Web3DataProviderRef, number: Eth1BlockNumber): Future[BlockObject] = + return p.web3.provider.eth_getBlockByNumber(&"0x{number:X}", false) + +proc getBlockNumber(p: DataProviderRef, hash: BlockHash): Future[Eth1BlockNumber] {.async.} = + debug "Querying block number", hash = $hash + + try: + let blk = awaitWithTimeout(p.getBlockByHash(hash), web3Timeouts): + return 0 + return Eth1BlockNumber(blk.number) + except CatchableError as exc: + notice "Failed to get Eth1 block number from hash", + hash = $hash, err = exc.msg + raise + +template readJsonField(j: JsonNode, + fieldName: string, + ValueType: type): untyped = + var res: ValueType + fromJson(j[fieldName], fieldName, res) + res + +proc readJsonDeposits(depositsList: JsonNode): seq[Eth1Block] = + if depositsList.kind != JArray: + raise newException(CatchableError, + "Web3 provider didn't return a list of deposit events") + + var lastEth1Block: Eth1Block + + for logEvent in depositsList: + let + blockNumber = Eth1BlockNumber readJsonField(logEvent, "blockNumber", Quantity) + blockHash = readJsonField(logEvent, "blockHash", BlockHash) + logData = strip0xPrefix(logEvent["data"].getStr) + + if lastEth1Block == nil or lastEth1Block.number != blockNumber: + lastEth1Block = Eth1Block( + number: blockNumber, + voteData: Eth1Data(block_hash: blockHash.asEth2Digest)) + + result.add lastEth1Block + + var + pubkey: Bytes48 + withdrawalCredentials: Bytes32 + amount: Bytes8 + signature: Bytes96 + index: Bytes8 + + var offset = 0 + offset += decode(logData, offset, pubkey) + offset += decode(logData, offset, withdrawalCredentials) + offset += decode(logData, offset, amount) + offset += decode(logData, offset, signature) + offset += decode(logData, offset, index) + + lastEth1Block.deposits.add Deposit( + data: DepositData( + pubkey: ValidatorPubKey.init(array[48, byte](pubkey)), + withdrawal_credentials: Eth2Digest(data: array[32, byte](withdrawalCredentials)), + amount: bytes_to_int(array[8, byte](amount)), + signature: ValidatorSig.init(array[96, byte](signature)))) + +method fetchDepositData*(p: Web3DataProviderRef, + fromBlock, toBlock: Eth1BlockNumber): Future[seq[Eth1Block]] + {.async, locks: 0.} = + return readJsonDeposits(await p.ns.getJsonLogs(DepositEvent, + fromBlock = some blockId(fromBlock), + toBlock = some blockId(toBlock))) + +method fetchBlockDetails(p: Web3DataProviderRef, blk: Eth1Block) {.async.} = + let + web3Block = p.getBlockByNumber(blk.number) + depositRoot = p.ns.get_deposit_root.call(blockNumber = blk.number) + rawCount = p.ns.get_deposit_count.call(blockNumber = blk.number) + + discard await web3Block + discard await depositRoot + discard await rawCount + + let depositCount = bytes_to_int(array[8, byte](rawCount.read)) + + blk.timestamp = Eth1BlockTimestamp(web3Block.read.timestamp) + blk.voteData.deposit_count = depositCount + blk.voteData.deposit_root = depositRoot.read.asEth2Digest + +method onDisconnect*(p: Web3DataProviderRef, handler: DisconnectHandler) {. + gcsafe + locks: 0 + # raises: [] +.} = + p.web3.onDisconnect = handler + +method onBlockHeaders*(p: Web3DataProviderRef, + blockHeaderHandler: BlockHeaderHandler, + errorHandler: SubscriptionErrorHandler): Future[void] {. + async + gcsafe + locks: 0 + # raises: [] +.} = + if p.blockHeadersSubscription != nil: + await p.blockHeadersSubscription.unsubscribe() + + debug "Subscribing for block headers" + + let options = newJObject() + p.blockHeadersSubscription = await p.web3.subscribeForBlockHeaders( + options, blockHeaderHandler, errorHandler) + # https://github.com/ethereum/eth2.0-specs/blob/v0.11.1/specs/phase0/validator.md#get_eth1_data func getBlockProposalData*(eth1Chain: Eth1Chain, state: BeaconState): (Eth1Data, seq[Deposit]) = @@ -310,175 +460,100 @@ template getBlockProposalData*(m: MainchainMonitor, state: BeaconState): untyped proc init*(T: type MainchainMonitor, dataProviderFactory: DataProviderFactory, depositContractAddress: string, - startBlock: Option[Eth2Digest], - startKind: StartKind): T = - T(depositQueue: newAsyncQueue[DepositQueueElem](), + startPosition: Eth1Data): T = + T(depositQueue: newAsyncQueue[BlockHeader](), dataProviderFactory: dataProviderFactory, depositContractAddress: Address.fromHex(depositContractAddress), - startBlock: startBlock, - eth1Chain: Eth1Chain(startKind: startKind)) + eth1Chain: Eth1Chain(knownStart: startPosition)) -proc readJsonDeposits(depositsList: JsonNode): seq[Deposit] = - if depositsList.kind != JArray: - raise newException(CatchableError, - "Web3 provider didn't return a list of deposit events") +proc isCandidateForGenesis(timeNow: float, blk: Eth1Block): bool = + if float(blk.timestamp + followDistanceInSeconds) > timeNow: + return false - for logEvent in depositsList: - var logData = strip0xPrefix(logEvent["data"].getStr) - var - pubkey: Bytes48 - withdrawalCredentials: Bytes32 - amount: Bytes8 - signature: Bytes96 - index: Bytes8 + if genesis_time_from_eth1_timestamp(blk.timestamp) < MIN_GENESIS_TIME: + return false - var offset = 0 - offset += decode(logData, offset, pubkey) - offset += decode(logData, offset, withdrawalCredentials) - offset += decode(logData, offset, amount) - offset += decode(logData, offset, signature) - offset += decode(logData, offset, index) + if blk.knownGoodDepositsCount.isSome: + blk.knownGoodDepositsCount.get >= totalDepositsNeededForGenesis + else: + blk.voteData.deposit_count >= totalDepositsNeededForGenesis - result.add Deposit( - data: DepositData( - pubkey: ValidatorPubKey.init(array[48, byte](pubkey)), - withdrawal_credentials: Eth2Digest(data: array[32, byte](withdrawalCredentials)), - amount: bytes_to_int(array[8, byte](amount)), - signature: ValidatorSig.init(array[96, byte](signature)))) - -proc checkForGenesisEvent(m: MainchainMonitor) = - if not m.genesisState.isNil: +proc minGenesisCandidateBlockIdx(eth1Chain: Eth1Chain): Option[int] + {.raises: [Defect].} = + if eth1Chain.blocks.len == 0: return - let lastBlock = m.eth1Chain.blocks.peekLast - const totalDepositsNeeded = max(SLOTS_PER_EPOCH, - MIN_GENESIS_ACTIVE_VALIDATOR_COUNT) + let now = epochTime() + if not isCandidateForGenesis(now, eth1Chain.blocks.peekLast): + return - if lastBlock.timestamp.uint64 >= MIN_GENESIS_TIME.uint64 and - m.eth1Chain.totalDeposits >= totalDepositsNeeded: - # This block is a genesis candidate - let startTime = lastBlock.timestamp.uint64 - var genesisDeposits = m.eth1Chain.allDeposits - attachMerkleProofs genesisDeposits - var s = initialize_beacon_state_from_eth1(lastBlock.voteData.block_hash, - startTime, genesisDeposits, {}) - if is_valid_genesis_state(s[]): - # https://github.com/ethereum/eth2.0-pm/tree/6e41fcf383ebeb5125938850d8e9b4e9888389b4/interop/mocked_start#create-genesis-state - info "Eth2 genesis state detected", - genesisTime = startTime, - genesisEth1Block = lastBlock.voteData.block_hash + var candidatePos = eth1Chain.blocks.len - 1 + while candidatePos > 1: + if not isCandidateForGenesis(now, eth1Chain.blocks[candidatePos - 1]): + break + dec candidatePos - s.genesis_time = startTime - m.genesisState = s + return some(candidatePos) - if not m.genesisStateFut.isNil: - m.genesisStateFut.complete() - m.genesisStateFut = nil +proc tryCreatingGenesisState(m: MainchainMonitor, + genesisCandidate: Eth1Block, + deposits: var openarray[Deposit]): uint64 = + attachMerkleProofs deposits -proc processDeposits(m: MainchainMonitor, - dataProvider: DataProviderRef, - startBlkNum: Eth1BlockNumber) {.async.} = - # ATTENTION! - # Please note that this code is using a queue to guarantee the - # strict serial order of processing of deposits. If we had the - # same code embedded in the deposit contracts events handler, - # it could easily re-order the steps due to the intruptable - # interleaved execution of async code. + let + s = initialize_beacon_state_from_eth1(genesisCandidate.voteData.block_hash, + genesisCandidate.timestamp.uint64, + deposits, {}) + active_validator_indices = get_active_validator_indices(s[], GENESIS_EPOCH) + + if is_valid_genesis_state(s[], active_validator_indices): + # https://github.com/ethereum/eth2.0-pm/tree/6e41fcf383ebeb5125938850d8e9b4e9888389b4/interop/mocked_start#create-genesis-state + info "Eth2 genesis state detected", + genesisTime = s.genesisTime, + genesisEth1Block = genesisCandidate.voteData.block_hash + + m.genesisState = s + + if not m.genesisStateFut.isNil: + m.genesisStateFut.complete() + m.genesisStateFut = nil + else: + info "Eth2 genesis candidate block rejected", + `block` = shortLog(genesisCandidate), + validDeposits = active_validator_indices.len, + needed = totalDepositsNeededForGenesis + + uint64(active_validator_indices.len) + +proc checkForGenesisLoop(m: MainchainMonitor) {.async.} = while true: - let (blockHash, eventType) = await m.depositQueue.popFirst() + if not m.genesisState.isNil: + return - if eventType == RemovedEvent: - debug "New Eth1 head selected. Purging history of deposits", - purgedBlock = $blockHash - m.eth1Chain.purgeChain(blockHash) - continue + try: + let genesisCandidateIdx = m.eth1Chain.minGenesisCandidateBlockIdx + if genesisCandidateIdx.isSome: + let + genesisCandidate = m.eth1Chain.blocks[genesisCandidateIdx.get] + eligibleDepositCount = genesisCandidate.voteData.deposit_count - let cachedBlock = m.eth1Chain.findBlock(blockHash) - if cachedBlock == nil: - try: - let web3Block = await dataProvider.getBlockByHash(blockHash) - if Eth1BlockNumber(web3Block.number) < startBlkNum: - warn "Invalid deposit reported from the web3 end-point", - reportedBlock = web3Block.number.uint64, minExpectedBlock = startBlkNum - continue + genesisCandidate.knownGoodDepositsCount = some tryCreatingGenesisState( + m, genesisCandidate, + m.eth1Chain.allDeposits.toOpenArray(0, int(eligibleDepositCount - 1))) + else: + # TODO: check for a stale monitor + discard + except CatchableError as err: + debug "Unexpected error in checkForGenesisLoop", err = err.msg - let eth1Block = await dataProvider.fetchDepositData(web3Block) - - if m.eth1Chain.addSuccessorBlock(eth1Block): - # TODO: We may check that the new deposits produce a merkle - # root matching the `deposit_root` value from the block. - # Not doing this is equivalent to trusting the Eth1 - # execution engine and data provider. - info "Eth1 block processed", eth1data = eth1Block.voteData - m.checkForGenesisEvent() - else: - # We are missing the parent block. - # This shouldn't be happening if the deposits events are reported in - # proper order, but nevertheless let's try to repair our chain: - var cachedParent = m.eth1Chain.findParent(web3Block) - doAssert cachedParent == nil - - var chainOfParents = newSeq[Eth1Block]() - var parentHash = web3Block.parentHash - - var expectedParentBlockNumber = web3Block.number.uint64 - 1 - debug "Eth1 parent block missing. Attempting to request from the network", - parentHash = parentHash.toHex, expectedParentBlockNumber - - while true: - if chainOfParents.len > reorgDepthLimit: - error "Detected Eth1 re-org exceeded the maximum depth limit", - headBlockHash = web3Block.hash.toHex, - ourHeadHash = m.eth1Chain.blocks.peekLast.voteData.block_hash - raise newException(ReorgDepthLimitExceeded, "Reorg depth limit exceeded") - - let parentWeb3Block = await dataProvider.getBlockByHash(parentHash) - if parentWeb3Block.number.uint64 != expectedParentBlockNumber: - error "Eth1 data provider supplied invalid parent block", - parentBlockNumber = parentWeb3Block.number.uint64, - expectedParentBlockNumber, parentHash = parentHash.toHex - raise newException(CorruptDataProvider, - "Parent block with incorrect number") - - if expectedParentBlockNumber <= startBlkNum or - startBlkNum == 0 and not await dataProvider.hasDepositContract(parentWeb3Block): - # We've reached the deposit contract creation - # No more deposit events are expected - m.eth1Chain.clear() - for i in countdown(chainOfParents.len - 1, 0): - let isSuccessor = m.eth1Chain.addSuccessorBlock chainOfParents[i] - doAssert isSuccessor - cachedParent = m.eth1Chain.blocks.peekLast - break - - chainOfParents.add(await dataProvider.fetchDepositData(parentWeb3Block)) - let localParent = m.eth1Chain.findParent(parentWeb3Block) - if localParent != nil: - m.eth1Chain.purgeDescendants(localParent) - for i in countdown(chainOfParents.len - 1, 0): - let isSuccessor = m.eth1Chain.addSuccessorBlock chainOfParents[i] - doAssert isSuccessor - cachedParent = m.eth1Chain.blocks.peekLast - break - - dec expectedParentBlockNumber - parentHash = parentWeb3Block.parentHash - - m.eth1Chain.purgeDescendants(cachedParent) - - except CatchableError as err: - # Connection problem? Put the unprocessed deposit back to queue. - # Raising the exception here will lead to a restart of the whole monitor. - m.depositQueue.addFirstNoWait((blockHash, eventType)) - raise err - -proc isRunning*(m: MainchainMonitor): bool = - not m.runFut.isNil + await sleepAsync(1.seconds) proc getGenesis*(m: MainchainMonitor): Future[BeaconStateRef] {.async.} = if m.genesisState.isNil: if m.genesisStateFut.isNil: m.genesisStateFut = newFuture[void]("getGenesis") + + m.genesisMonitoringFut = m.checkForGenesisLoop() await m.genesisStateFut m.genesisStateFut = nil @@ -488,92 +563,101 @@ proc getGenesis*(m: MainchainMonitor): Future[BeaconStateRef] {.async.} = result = new BeaconStateRef # make the compiler happy raiseAssert "Unreachable code" -method getBlockByHash*(p: Web3DataProviderRef, hash: BlockHash): Future[BlockObject] = - return p.web3.provider.eth_getBlockByHash(hash, false) +func totalNonFinalizedBlocks(eth1Chain: Eth1Chain): Natural = + # TODO: implement this precisely + eth1Chain.blocks.len -method close*(p: Web3DataProviderRef): Future[void] {.async, locks: 0.} = - if p.subscription != nil: - await p.subscription.unsubscribe() - await p.web3.close() +func latestEth1Data(eth1Chain: Eth1Chain): Eth1Data = + if eth1Chain.blocks.len > 0: + eth1Chain.blocks[^1].voteData + else: + eth1Chain.knownStart -method hasDepositContract*(p: Web3DataProviderRef, - web3Block: BlockObject): Future[bool] {.async, locks: 0.} = - result = await p.ns.isDeployed(web3Block.blockId) +func knownInvalidDepositsCount(eth1Chain: Eth1Chain): uint64 = + for i in countdown(eth1Chain.blocks.len - 1, 0): + let blk = eth1Chain.blocks[i] + if blk.knownGoodDepositsCount.isSome: + return blk.voteData.deposit_count - blk.knownGoodDepositsCount.get -method fetchDepositData*(p: Web3DataProviderRef, - web3Block: BlockObject): Future[Eth1Block] {.async, locks: 0.} = - let - blockHash = web3Block.hash - blockId = web3Block.blockId - depositRoot = await p.ns.get_deposit_root.call(blockNumber = web3Block.number.uint64) - rawCount = await p.ns.get_deposit_count.call(blockNumber = web3Block.number.uint64) - depositCount = bytes_to_int(array[8, byte](rawCount)) - depositsJson = await p.ns.getJsonLogs(DepositEvent, fromBlock = some(blockId), toBlock = some(blockId)) - deposits = readJsonDeposits(depositsJson) + return 0 - return Eth1Block( - number: Eth1BlockNumber(web3Block.number), - timestamp: Eth1BlockTimestamp(web3Block.timestamp), - deposits: deposits, - voteData: Eth1Data(deposit_root: depositRoot.asEth2Digest, - deposit_count: depositCount, - block_hash: blockHash.asEth2Digest)) +func maxValidDeposits(eth1Chain: Eth1Chain): uint64 = + if eth1Chain.blocks.len > 0: + let lastBlock = eth1Chain.blocks[^1] + lastBlock.knownGoodDepositsCount.get( + lastBlock.voteData.deposit_count - eth1Chain.knownInvalidDepositsCount) + else: + 0 -method onDisconnect*(p: Web3DataProviderRef, handler: DisconnectHandler) {. - gcsafe - locks: 0 - # raises: [] -.} = - p.web3.onDisconnect = handler +proc processDeposits(m: MainchainMonitor, + dataProvider: DataProviderRef) {.async.} = + # ATTENTION! + # Please note that this code is using a queue to guarantee the + # strict serial order of processing of deposits. If we had the + # same code embedded in the deposit contracts events handler, + # it could easily re-order the steps due to the intruptable + # interleaved execution of async code. + while true: + let blk = await m.depositQueue.popFirst() + m.eth1Chain.trimHeight(Eth1BlockNumber(blk.number) - 1) -method onDepositEvent*(p: Web3DataProviderRef, - startBlock: Eth1BlockNumber, - handler: DepositEventHandler): Future[void] {. - async - gcsafe - locks: 0 - # raises: [] -.} = - if p.subscription != nil: - await p.subscription.unsubscribe() + let latestKnownBlock = if m.eth1Chain.blocks.len > 0: + m.eth1Chain.blocks[^1].number + elif m.eth1Chain.knownStartBlockNum.isSome: + m.eth1Chain.knownStartBlockNum.get + else: + m.eth1Chain.knownStartBlockNum = some( + await dataProvider.getBlockNumber(m.eth1Chain.knownStart.block_hash.asBlockHash)) + m.eth1Chain.knownStartBlockNum.get - debug "Subscribing for deposit events", startBlock + let eth1Blocks = await dataProvider.fetchDepositData(latestKnownBlock + 1, + Eth1BlockNumber blk.number) + if eth1Blocks.len == 0: + if m.eth1Chain.maxValidDeposits > totalDepositsNeededForGenesis: + let latestEth1Data = m.eth1Chain.latestEth1Data - p.subscription = await p.ns.subscribe( - DepositEvent, %*{"fromBlock": &"0x{startBlock:X}"}, handler) + for missingBlockNum in latestKnownBlock + 1 ..< Eth1BlockNumber(blk.number): + let missingBlock = await dataProvider.getBlockByNumber(missingBlockNum) + doAssert m.eth1Chain.addSuccessorBlock Eth1Block( + number: Eth1BlockNumber(missingBlock.number), + timestamp: Eth1BlockTimestamp(missingBlock.timestamp), + voteData: latestEth1Data) -proc getBlockNumber(p: DataProviderRef, hash: BlockHash): Future[Eth1BlockNumber] {.async.} = - debug "Querying block number", hash = $hash + doAssert m.eth1Chain.addSuccessorBlock Eth1Block( + number: Eth1BlockNumber(blk.number), + timestamp: Eth1BlockTimestamp(blk.timestamp), + voteData: latestEth1Data) + else: + template logBlockProcessed(blk) = + info "Eth1 block processed", + `block` = shortLog(blk), totalDeposits = blk.voteData.deposit_count - try: - let blk = awaitWithTimeout(p.getBlockByHash(hash), web3Timeouts): - return 0 - return Eth1BlockNumber(blk.number) - except CatchableError as exc: - notice "Failed to get Eth1 block number from hash", - hash = $hash, err = exc.msg - raise + await dataProvider.fetchBlockDetails(eth1Blocks[0]) + if m.eth1Chain.addSuccessorBlock(eth1Blocks[0]): + logBlockProcessed eth1Blocks[0] -proc new*(T: type Web3DataProvider, - web3Url: string, - depositContractAddress: Address): Future[ref Web3DataProvider] {. - async - # raises: [Defect] -.} = - try: - type R = ref T - let - web3 = await newWeb3(web3Url) - ns = web3.contractSender(DepositContract, depositContractAddress) - return R(url: web3Url, web3: web3, ns: ns) - except CatchableError: - return nil + for i in 1 ..< eth1Blocks.len: + await dataProvider.fetchBlockDetails(eth1Blocks[i]) + if m.eth1Chain.addSuccessorBlock(eth1Blocks[i]): + logBlockProcessed eth1Blocks[i] + else: + raise newException(CorruptDataProvider, + "A non-successor Eth1 block reported") + else: + # A non-continuous chain detected. + # This could be the result of a deeper fork that was not reported + # properly by the web3 provider. Since this should be an extremely + # rare event we can afford to handle it in a relatively inefficient + # manner. Let's delete half of our non-finalized chain and try again. + let blocksToPop = max(1, m.eth1Chain.totalNonFinalizedBlocks div 2) + warn "Web3 provider responded with a non-continous chain of deposits.", + backtrackedDeposits = blocksToPop + for i in 0 ..< blocksToPop: + m.eth1Chain.popBlock() + m.depositQueue.addFirstNoWait blk -func web3Provider*(web3Url: string): DataProviderFactory = - proc factory(depositContractAddress: Address): Future[DataProviderRef] {.async.} = - result = await Web3DataProvider.new(web3Url, depositContractAddress) - - DataProviderFactory(desc: "web3(" & web3Url & ")", new: factory) +proc isRunning*(m: MainchainMonitor): bool = + not m.runFut.isNil func `===`(json: JsonNode, boolean: bool): bool = json.kind == JBool and json.bval == boolean @@ -589,43 +673,36 @@ proc run(m: MainchainMonitor, delayBeforeStart: Duration) {.async.} = raise newException(CatchableError, "Failed to initialize Eth1 data provider") try: - let startBlkNum = if m.startBlock.isSome: - await dataProvider.getBlockNumber(m.startBlock.get.asBlockHash) - else: - 0 - info "Monitoring eth1 deposits", - fromBlock = startBlkNum.uint64, contract = $m.depositContractAddress, url = m.dataProviderFactory.desc - await dataProvider.onDepositEvent(Eth1BlockNumber(startBlkNum)) do ( - pubkey: Bytes48, - withdrawalCredentials: Bytes32, - amount: Bytes8, - signature: Bytes96, merkleTreeIndex: Bytes8, j: JsonNode) - {.raises: [Defect], gcsafe.}: + await dataProvider.onBlockHeaders do (blk: BlockHeader) + {.raises: [Defect], gcsafe}: try: - let - blockHash = BlockHash.fromHex(j["blockHash"].getStr()) - eventType = if j{"removed"} === true: RemovedEvent - else: NewEvent + m.depositQueue.addLastNoWait(blk) + except AsyncQueueFullError: + raiseAssert "The depositQueue has no size limit" + except Exception: + # TODO Investigate why this exception is being raised + raiseAssert "queue.addLastNoWait should not raise exceptions" + do (err: CatchableError): + debug "Error while processing Eth1 block headers subscription", err = err.msg - m.depositQueue.addLastNoWait((blockHash, eventType)) + await m.processDeposits(dataProvider) - except CatchableError as exc: - warn "Received invalid deposit", err = exc.msg, j - except Exception as err: - # chronos still raises exceptions which inherit directly from Exception - if err[] of Defect: - raise (ref Defect)(err) - else: - warn "Received invalid deposit", err = err.msg, j - - await m.processDeposits(dataProvider, startBlkNum) finally: await close(dataProvider) +proc safeCancel(fut: var Future[void]) = + if not fut.isNil: + fut.cancel() + fut = nil + +proc stop*(m: MainchainMonitor) = + safeCancel m.runFut + safeCancel m.genesisMonitoringFut + proc start(m: MainchainMonitor, delayBeforeStart: Duration) = if m.runFut.isNil: let runFut = m.run(delayBeforeStart) @@ -635,7 +712,7 @@ proc start(m: MainchainMonitor, delayBeforeStart: Duration) = if runFut.error[] of CatchableError: if runFut == m.runFut: error "Mainchain monitor failure, restarting", err = runFut.error.msg - m.runFut = nil + m.stop() m.start(5.seconds) else: fatal "Fatal exception reached", err = runFut.error.msg @@ -644,11 +721,6 @@ proc start(m: MainchainMonitor, delayBeforeStart: Duration) = proc start*(m: MainchainMonitor) {.inline.} = m.start(0.seconds) -proc stop*(m: MainchainMonitor) = - if not m.runFut.isNil: - m.runFut.cancel() - m.runFut = nil - proc getLatestEth1BlockHash*(url: string): Future[Eth2Digest] {.async.} = let web3 = await newWeb3(url) try: diff --git a/beacon_chain/merkle_minimal.nim b/beacon_chain/merkle_minimal.nim index c330c9987..b9123a8d7 100644 --- a/beacon_chain/merkle_minimal.nim +++ b/beacon_chain/merkle_minimal.nim @@ -83,7 +83,7 @@ proc getMerkleProof*[Depth: static int](tree: SparseMerkleTree[Depth], else: result[depth] = zeroHashes[depth] -proc attachMerkleProofs*(deposits: var seq[Deposit]) = +proc attachMerkleProofs*(deposits: var openarray[Deposit]) = let deposit_data_roots = mapIt(deposits, it.data.hash_tree_root) var deposit_data_sums: seq[Eth2Digest] diff --git a/beacon_chain/spec/beaconstate.nim b/beacon_chain/spec/beaconstate.nim index 9ad5c9b2e..b50a12c2d 100644 --- a/beacon_chain/spec/beaconstate.nim +++ b/beacon_chain/spec/beaconstate.nim @@ -83,7 +83,9 @@ proc process_deposit*( if not verify_deposit_signature(deposit.data): # It's ok that deposits fail - they get included in blocks regardless # TODO spec test? - debug "Skipping deposit with invalid signature", + # TODO: This is temporary set to trace level in order to deal with the + # large number of invalid deposits on Altona + trace "Skipping deposit with invalid signature", deposit = shortLog(deposit.data) return true @@ -192,12 +194,20 @@ proc slash_validator*(state: var BeaconState, slashed_index: ValidatorIndex, increase_balance( state, whistleblower_index, whistleblowing_reward - proposer_reward) +func genesis_time_from_eth1_timestamp*(eth1_timestamp: uint64): uint64 = + # TODO: remove once we switch completely to v0.12.1 + when SPEC_VERSION == "0.12.1": + eth1_timestamp + GENESIS_DELAY + else: + const SECONDS_PER_DAY = uint64(60*60*24) + eth1_timestamp + 2'u64 * SECONDS_PER_DAY - (eth1_timestamp mod SECONDS_PER_DAY) + # https://github.com/ethereum/eth2.0-specs/blob/v0.12.1/specs/phase0/beacon-chain.md#genesis proc initialize_beacon_state_from_eth1*( eth1_block_hash: Eth2Digest, eth1_timestamp: uint64, deposits: openArray[Deposit], - flags: UpdateFlags = {}): BeaconStateRef {.nbench.}= + flags: UpdateFlags = {}): BeaconStateRef {.nbench.} = ## Get the genesis ``BeaconState``. ## ## Before the beacon chain starts, validators will register in the Eth1 chain @@ -214,19 +224,12 @@ proc initialize_beacon_state_from_eth1*( # at that point :) doAssert deposits.len >= SLOTS_PER_EPOCH - const SECONDS_PER_DAY = uint64(60*60*24) var state = BeaconStateRef( fork: Fork( previous_version: Version(GENESIS_FORK_VERSION), current_version: Version(GENESIS_FORK_VERSION), epoch: GENESIS_EPOCH), - genesis_time: - # TODO: remove once we switch completely to v0.12.1 - when SPEC_VERSION == "0.12.1": - eth1_timestamp + GENESIS_DELAY - else: - eth1_timestamp + 2'u64 * SECONDS_PER_DAY - - (eth1_timestamp mod SECONDS_PER_DAY), + genesis_time: genesis_time_from_eth1_timestamp(eth1_timestamp), eth1_data: Eth1Data(block_hash: eth1_block_hash, deposit_count: uint64(len(deposits))), latest_block_header: @@ -280,11 +283,11 @@ proc initialize_hashed_beacon_state_from_eth1*( eth1_block_hash, eth1_timestamp, deposits, flags) HashedBeaconState(data: genesisState[], root: hash_tree_root(genesisState[])) -func is_valid_genesis_state*(state: BeaconState): bool = +func is_valid_genesis_state*(state: BeaconState, active_validator_indices: seq[ValidatorIndex]): bool = if state.genesis_time < MIN_GENESIS_TIME: return false # This is an okay get_active_validator_indices(...) for the time being. - if len(get_active_validator_indices(state, GENESIS_EPOCH)) < MIN_GENESIS_ACTIVE_VALIDATOR_COUNT: + if len(active_validator_indices) < MIN_GENESIS_ACTIVE_VALIDATOR_COUNT: return false return true diff --git a/vendor/nim-web3 b/vendor/nim-web3 index 694ff2ad7..a75519fe1 160000 --- a/vendor/nim-web3 +++ b/vendor/nim-web3 @@ -1 +1 @@ -Subproject commit 694ff2ad74b36f7f8402235f92134cb5532661d4 +Subproject commit a75519fe1264ea861fb65eb2ffec1d6566ebd033