diff --git a/beacon_chain/eth2_network.nim b/beacon_chain/eth2_network.nim index 173847282..e6442e6d3 100644 --- a/beacon_chain/eth2_network.nim +++ b/beacon_chain/eth2_network.nim @@ -450,17 +450,6 @@ when useNativeSnappy: else: include libp2p_streams_backend -template awaitWithTimeout[T](operation: Future[T], - deadline: Future[void], - onTimeout: untyped): T = - let f = operation - await f or deadline - if not f.finished: - cancel f - onTimeout - else: - f.read - proc makeEth2Request(peer: Peer, protocolId: string, requestBytes: Bytes, ResponseMsg: type, timeout: Duration): Future[NetRes[ResponseMsg]] diff --git a/beacon_chain/mainchain_monitor.nim b/beacon_chain/mainchain_monitor.nim index c630486e3..d12e2a702 100644 --- a/beacon_chain/mainchain_monitor.nim +++ b/beacon_chain/mainchain_monitor.nim @@ -1,7 +1,8 @@ import deques, tables, hashes, options, - chronos, web3, json, chronicles, - spec/[datatypes, digest, crypto, beaconstate, helpers] + chronos, web3, json, chronicles, eth/async_utils, + spec/[datatypes, digest, crypto, beaconstate, helpers], + merkle_minimal contract(DepositContract): proc deposit(pubkey: Bytes48, @@ -86,6 +87,7 @@ type const reorgDepthLimit = 1000 + web3Timeouts = 5.seconds # TODO Nim's analysis on the lock level of the methods in this # module seems broken. Investigate and file this as an issue. @@ -304,13 +306,13 @@ proc init*(T: type MainchainMonitor, const MIN_GENESIS_TIME = 0 -proc readJsonDeposits(json: JsonNode): seq[Deposit] = - if json.kind != JArray: +proc readJsonDeposits(depositsList: JsonNode): seq[Deposit] = + if depositsList.kind != JArray: raise newException(CatchableError, "Web3 provider didn't return a list of deposit events") - for logEvent in json: - var logData = strip0xPrefix(json["data"].getStr) + for logEvent in depositsList: + var logData = strip0xPrefix(logEvent["data"].getStr) var pubkey: Bytes48 withdrawalCredentials: Bytes32 @@ -326,7 +328,6 @@ proc readJsonDeposits(json: JsonNode): seq[Deposit] = offset = decode(logData, offset, index) result.add Deposit( - # proof: TODO data: DepositData( pubkey: ValidatorPubKey.init(array[48, byte](pubkey)), withdrawal_credentials: Eth2Digest(data: array[32, byte](withdrawalCredentials)), @@ -345,13 +346,20 @@ proc checkForGenesisEvent(m: MainchainMonitor) = m.eth1Chain.totalDeposits >= totalDepositsNeeded: # This block is a genesis candidate let startTime = lastBlock.timestamp.uint64 + var genesisDeposits = m.eth1Chain.allDeposits + attachMerkleProofs genesisDeposits var s = initialize_beacon_state_from_eth1(lastBlock.voteData.block_hash, - startTime, m.eth1Chain.allDeposits, {}) + startTime, genesisDeposits, {}) if is_valid_genesis_state(s[]): # https://github.com/ethereum/eth2.0-pm/tree/6e41fcf383ebeb5125938850d8e9b4e9888389b4/interop/mocked_start#create-genesis-state s.genesis_time = startTime + info "Eth2 genesis state detected", + genesisTime = startTime, + genesisEth1Block = lastBlock.voteData.block_hash + m.genesisState = s + if not m.genesisStateFut.isNil: m.genesisStateFut.complete() m.genesisStateFut = nil @@ -370,6 +378,8 @@ proc processDeposits(m: MainchainMonitor, dataProvider: DataProviderRef) {. let (blockHash, eventType) = await m.depositQueue.popFirst() if eventType == RemovedEvent: + info "New Eth1 head selected. Purging history of deposits", + purgedBlock = $blockHash m.eth1Chain.purgeChain(blockHash) continue @@ -421,14 +431,19 @@ proc processDeposits(m: MainchainMonitor, dataProvider: DataProviderRef) {. m.eth1Chain.purgeDescendants(cachedParent) + # TODO: We may check that the new deposits produce a merkle + # root matching the `deposit_root` value from the block. + # Not doing this is equivalent to trusting the Eth1 + # execution engine and data provider. + info "Eth1 block processed", eth1data = eth1Block.voteData m.eth1Chain.addBlock eth1Block m.checkForGenesisEvent() - except CatchableError: + except CatchableError as err: # Connection problem? Put the unprocessed deposit back to queue. # Raising the exception here will lead to a restart of the whole monitor. m.depositQueue.addFirstNoWait((blockHash, eventType)) - raise + raise err proc isRunning*(m: MainchainMonitor): bool = not m.runFut.isNil @@ -447,8 +462,7 @@ proc getGenesis*(m: MainchainMonitor): Future[BeaconStateRef] {.async.} = raiseAssert "Unreachable code" method getBlockByHash*(p: Web3DataProviderRef, hash: BlockHash): Future[BlockObject] = - discard - # p.web3.provider.eth_getBlockByHash(hash, false) + p.web3.provider.eth_getBlockByHash(hash, false) method close*(p: Web3DataProviderRef): Future[void] {.async, locks: 0.} = if p.subscription != nil: @@ -498,7 +512,8 @@ proc getBlockNumber(p: DataProviderRef, hash: BlockHash): Future[Quantity] {.asy debug "Querying block number", hash = $hash try: - let blk = await p.getBlockByHash(hash) + let blk = awaitWithTimeout(p.getBlockByHash(hash), web3Timeouts): + return Quantity(0'u64) return blk.number except CatchableError as exc: notice "Failed to get Eth1 block number from hash", @@ -540,12 +555,16 @@ proc run(m: MainchainMonitor, delayBeforeStart: Duration) {.async.} = let processFut = m.processDeposits(dataProvider) try: dataProvider.onDisconnect do: - error "Eth1 data provider disconnected", + warn "Eth1 data provider disconnected", provider = m.dataProviderFactory.desc processFut.cancel() - let startBlkNum = await dataProvider.getBlockNumber(m.startBlock) - notice "Monitoring eth1 deposits", + let startBlkNum = if m.startBlock != default(BlockHash): + await dataProvider.getBlockNumber(m.startBlock) + else: + Quantity(0'u64) + + info "Monitoring eth1 deposits", fromBlock = startBlkNum.uint64, contract = $m.depositContractAddress, url = m.dataProviderFactory.desc diff --git a/vendor/nim-eth b/vendor/nim-eth index c13e59ada..da56b0531 160000 --- a/vendor/nim-eth +++ b/vendor/nim-eth @@ -1 +1 @@ -Subproject commit c13e59adaf988f9bb754dba05b0efa9176ffe664 +Subproject commit da56b0531eb7ced50ffa3470e93d30c3a4a78434