diff --git a/beacon_chain/eth1_monitor.nim b/beacon_chain/eth1_monitor.nim index a575c3f5d..41815e4c2 100644 --- a/beacon_chain/eth1_monitor.nim +++ b/beacon_chain/eth1_monitor.nim @@ -34,8 +34,8 @@ contract(DepositContract): const web3Timeouts = 60.seconds - hasDepositRootChecks = defined(has_deposit_root_checks) - hasGenesisDetection* = defined(has_genesis_detection) + hasDepositRootChecks = true # defined(has_deposit_root_checks) + hasGenesisDetection* = true # defined(has_genesis_detection) type Eth1BlockNumber* = uint64 @@ -58,23 +58,36 @@ type blocks: Deque[Eth1Block] blocksByHash: Table[BlockHash, Eth1Block] + Eth1MonitorState = enum + Initialized + Started + Failed + Stopping + Stopped + Eth1Monitor* = ref object - db: BeaconChainDB + state: Eth1MonitorState preset: RuntimePreset + web3Url: string + eth1Network: Option[Eth1Network] + depositContractAddress*: Eth1Address dataProvider: Web3DataProviderRef latestEth1BlockNumber: Eth1BlockNumber eth1Progress: AsyncEvent + db: BeaconChainDB eth1Chain: Eth1Chain knownStart: DepositContractSnapshot - eth2FinalizedDepositsMerkleizer: DepositsMerkleizer runFut: Future[void] + stopFut: Future[void] when hasGenesisDetection: + genesisValidators: seq[ImmutableValidatorData] + genesisValidatorKeyToIndex: Table[ValidatorPubKey, ValidatorIndex] genesisState: NilableBeaconStateRef genesisStateFut: Future[void] @@ -121,11 +134,81 @@ declareGauge eth1_finalized_deposits, declareGauge eth1_chain_len, "The length of the in-memory chain of Eth1 blocks" -template depositContractAddress*(m: Eth1Monitor): Eth1Address = - m.dataProvider.ns.contractAddress +func depositCountU64(s: DepositContractState): uint64 = + for i in 0 .. 23: + doAssert s.deposit_count[i] == 0 -template web3Url*(m: Eth1Monitor): string = - m.dataProvider.url + uint64.fromBytesBE s.deposit_count[24..31] + +when hasGenesisDetection: + import spec/[beaconstate, signatures] + + template hasEnoughValidators(m: Eth1Monitor, blk: Eth1Block): bool = + blk.activeValidatorsCount >= m.preset.MIN_GENESIS_ACTIVE_VALIDATOR_COUNT + + func chainHasEnoughValidators(m: Eth1Monitor): bool = + if m.eth1Chain.blocks.len > 0: + m.hasEnoughValidators(m.eth1Chain.blocks[^1]) + else: + m.knownStart.depositContractState.depositCountU64 >= + m.preset.MIN_GENESIS_ACTIVE_VALIDATOR_COUNT + + func isAfterMinGenesisTime(m: Eth1Monitor, blk: Eth1Block): bool = + doAssert blk.timestamp != 0 + let t = genesis_time_from_eth1_timestamp(m.preset, uint64 blk.timestamp) + t >= m.preset.MIN_GENESIS_TIME + + func isGenesisCandidate(m: Eth1Monitor, blk: Eth1Block): bool = + m.hasEnoughValidators(blk) and m.isAfterMinGenesisTime(blk) + + proc findGenesisBlockInRange(m: Eth1Monitor, startBlock, endBlock: Eth1Block): + Future[Eth1Block] {.async, gcsafe.} + + proc signalGenesis(m: Eth1Monitor, genesisState: BeaconStateRef) = + m.genesisState = genesisState + + if not m.genesisStateFut.isNil: + m.genesisStateFut.complete() + m.genesisStateFut = nil + + proc allGenesisDepositsUpTo(m: Eth1Monitor, totalDeposits: uint64): seq[DepositData] = + for i in 0'u64 ..< totalDeposits: + result.add m.db.genesisDeposits.get(i) + + proc createGenesisState(m: Eth1Monitor, eth1Block: Eth1Block): BeaconStateRef = + notice "Generating genesis state", + blockNum = eth1Block.number, + blockHash = eth1Block.voteData.block_hash, + blockTimestamp = eth1Block.timestamp, + totalDeposits = eth1Block.voteData.deposit_count, + activeValidators = eth1Block.activeValidatorsCount + + var deposits = m.allGenesisDepositsUpTo(eth1Block.voteData.deposit_count) + + result = initialize_beacon_state_from_eth1( + m.preset, + eth1Block.voteData.block_hash, + eth1Block.timestamp.uint64, + deposits, {}) + + if eth1Block.activeValidatorsCount != 0: + doAssert result.validators.lenu64 == eth1Block.activeValidatorsCount + + proc produceDerivedData(m: Eth1Monitor, deposit: DepositData) = + let htr = hash_tree_root(deposit) + + if verify_deposit_signature(m.preset, deposit): + let pubkey = deposit.pubkey + if pubkey notin m.genesisValidatorKeyToIndex: + let idx = ValidatorIndex m.genesisValidators.len + m.genesisValidators.add ImmutableValidatorData( + pubkey: pubkey, + withdrawal_credentials: deposit.withdrawal_credentials) + m.genesisValidatorKeyToIndex.insert(pubkey, idx) + + proc processGenesisDeposit*(m: Eth1Monitor, newDeposit: DepositData) = + m.db.genesisDeposits.add newDeposit + m.produceDerivedData(newDeposit) template blocks*(m: Eth1Monitor): Deque[Eth1Block] = m.eth1Chain.blocks @@ -284,7 +367,10 @@ template awaitWithRetries[T](lazyFutExpr: Future[T], proc close*(p: Web3DataProviderRef): Future[void] {.async.} = if p.blockHeadersSubscription != nil: - awaitWithRetries(p.blockHeadersSubscription.unsubscribe()) + try: + awaitWithRetries(p.blockHeadersSubscription.unsubscribe()) + except CatchableError: + debug "Failed to clean up block headers subscription properly" await p.web3.close() @@ -402,9 +488,6 @@ when hasDepositRootChecks: proc onBlockHeaders*(p: Web3DataProviderRef, blockHeaderHandler: BlockHeaderHandler, errorHandler: SubscriptionErrorHandler) {.async.} = - if p.blockHeadersSubscription != nil: - awaitWithRetries(p.blockHeadersSubscription.unsubscribe()) - info "Waiting for new Eth1 block headers" p.blockHeadersSubscription = awaitWithRetries( @@ -415,12 +498,6 @@ proc onBlockHeaders*(p: Web3DataProviderRef, func getDepositsRoot(m: DepositsMerkleizer): Eth2Digest = mixInLength(m.getFinalHash, int m.totalChunks) -func depositCountU64(s: DepositContractState): uint64 = - for i in 0 .. 23: - doAssert s.deposit_count[i] == 0 - - uint64.fromBytesBE s.deposit_count[24..31] - func toDepositContractState(merkleizer: DepositsMerkleizer): DepositContractState = # TODO There is an off by one discrepancy in the size of the arrays here that # need to be investigated. It shouldn't matter as long as the tree is @@ -647,39 +724,18 @@ proc init*(T: type Eth1Monitor, web3Url: string, depositContractAddress: Eth1Address, depositContractSnapshot: DepositContractSnapshot, - eth1Network: Option[Eth1Network]): Future[Result[T, string]] {.async.} = - + eth1Network: Option[Eth1Network]): T = var web3Url = web3Url fixupWeb3Urls web3Url - try: - let dataProviderRes = await Web3DataProvider.new(depositContractAddress, web3Url) - if dataProviderRes.isErr: - return err(dataProviderRes.error) - - let - dataProvider = dataProviderRes.get - web3 = dataProvider.web3 - - if eth1Network.isSome: - let - providerNetwork = awaitWithRetries web3.provider.net_version() - expectedNetwork = case eth1Network.get - of mainnet: "1" - of rinkeby: "4" - of goerli: "5" - if expectedNetwork != providerNetwork: - return err("The specified web3 provider is not attached to the " & - $eth1Network.get & " network") - - return ok T( - db: db, - preset: preset, - knownStart: depositContractSnapshot, - dataProvider: dataProvider, - eth1Progress: newAsyncEvent()) - except CatchableError as err: - return err("Failed to initialize the Eth1 monitor") + T(state: Initialized, + db: db, + preset: preset, + knownStart: depositContractSnapshot, + depositContractAddress: depositContractAddress, + web3Url: web3Url, + eth1Network: eth1Network, + eth1Progress: newAsyncEvent()) proc safeCancel(fut: var Future[void]) = if not fut.isNil and not fut.finished: @@ -690,12 +746,23 @@ proc clear(chain: var Eth1Chain) = chain.blocks.clear() chain.blocksByHash.clear() -proc stop*(m: Eth1Monitor) = +proc resetState(m: Eth1Monitor) {.async.} = safeCancel m.runFut m.eth1Chain.clear() m.latestEth1BlockNumber = 0 + await m.dataProvider.close() + +proc stop*(m: Eth1Monitor) {.async.} = + if m.state == Started: + m.state = Stopping + m.stopFut = resetState(m) + await m.stopFut + m.state = Stopped + elif m.state == Stopping: + await m.stopFut + const votedBlocksSafetyMargin = 50 @@ -705,7 +772,7 @@ proc earliestBlockOfInterest(m: Eth1Monitor): Eth1BlockNumber = proc syncBlockRange(m: Eth1Monitor, merkleizer: ref DepositsMerkleizer, fromBlock, toBlock, - fullSyncFromBlock: Eth1BlockNumber) {.async.} = + fullSyncFromBlock: Eth1BlockNumber) {.gcsafe, async.} = doAssert m.eth1Chain.blocks.len > 0 var currentBlock = fromBlock @@ -742,7 +809,7 @@ proc syncBlockRange(m: Eth1Monitor, depositLogs = try: # Downloading large amounts of deposits can be quite slow - awaitWithTimeout(jsonLogsFut, seconds(600)): + awaitWithTimeout(jsonLogsFut, web3Timeouts): retryOrRaise newException(DataProviderTimeout, "Request time out while obtaining json logs") except CatchableError as err: @@ -794,10 +861,6 @@ proc syncBlockRange(m: Eth1Monitor, ourCount = lastBlock.voteData.deposit_count, ourRoot = lastBlock.voteData.deposit_root - let depositContractState = DepositContractSnapshot( - eth1Block: lastBlock.voteData.block_hash, - depositContractState: merkleizer[].toDepositContractState) - case status of DepositRootIncorrect, DepositCountIncorrect: raise newException(CorruptDataProvider, @@ -812,19 +875,17 @@ proc syncBlockRange(m: Eth1Monitor, depositsProcessed = lastBlock.voteData.deposit_count when hasGenesisDetection: - if m.genesisStateFut != nil: + if blocksWithDeposits.len > 0: for blk in blocksWithDeposits: for deposit in blk.deposits: - if skipBlsCheck or verify_deposit_signature(m.preset, deposit): - let pubkey = deposit.pubkey - if pubkey notin validatorKeyToIndex: - let idx = ValidatorIndex validators.len - validators.add ImmutableValidatorData( - pubkey: pubkey, - withdrawal_credentials: deposit.withdrawal_credentials) - validatorKeyToIndex.insert(pubkey, idx) + m.processGenesisDeposit(deposit) + blk.activeValidatorsCount = m.genesisValidators.lenu64 - blk.activeValidatorsCount = m.db.immutableValidatorData.lenu64 + let depositContractState = DepositContractSnapshot( + eth1Block: blocksWithDeposits[^1].voteData.block_hash, + depositContractState: merkleizer[].toDepositContractState) + + m.db.putEth2FinalizedTo depositContractState if m.genesisStateFut != nil and m.chainHasEnoughValidators: let lastIdx = m.eth1Chain.blocks.len - 1 @@ -845,7 +906,7 @@ proc syncBlockRange(m: Eth1Monitor, var genesisBlockIdx = m.eth1Chain.blocks.len - 1 if m.isAfterMinGenesisTime(m.eth1Chain.blocks[genesisBlockIdx]): - for i in 1 ..< eth1Blocks.len: + for i in 1 ..< blocksWithDeposits.len: let idx = (m.eth1Chain.blocks.len - 1) - i let blk = m.eth1Chain.blocks[idx] awaitWithRetries m.dataProvider.fetchTimestamp(blk) @@ -878,7 +939,52 @@ proc syncBlockRange(m: Eth1Monitor, m.signalGenesis m.createGenesisState(genesisBlock) -proc startEth1Syncing(m: Eth1Monitor) {.async.} = +proc startEth1Syncing(m: Eth1Monitor, delayBeforeStart: Duration) {.async.} = + if m.state == Failed: + await m.resetState() + elif m.state == Stopping: + await m.stopFut + + if delayBeforeStart != ZeroDuration: + await sleepAsync(delayBeforeStart) + + info "Starting Eth1 deposit contract monitoring", + contract = $m.depositContractAddress, url = m.web3Url + + let dataProviderRes = await Web3DataProvider.new( + m.depositContractAddress, + m.web3Url) + + m.dataProvider = dataProviderRes.tryGet() + let web3 = m.dataProvider.web3 + + if m.state == Initialized and m.eth1Network.isSome: + let + providerNetwork = awaitWithRetries web3.provider.net_version() + expectedNetwork = case m.eth1Network.get + of mainnet: "1" + of rinkeby: "4" + of goerli: "5" + if expectedNetwork != providerNetwork: + fatal "The specified web3 provider serves data for a different network", + expectedNetwork, providerNetwork + quit 1 + + m.state = Started + + await m.dataProvider.onBlockHeaders do (blk: Eth1BlockHeader) + {.raises: [Defect], gcsafe.}: + try: + if blk.number.uint64 > m.latestEth1BlockNumber: + eth1_latest_head.set blk.number.toGaugeValue + m.latestEth1BlockNumber = Eth1BlockNumber blk.number + m.eth1Progress.fire() + except Exception: + # TODO Investigate why this exception is being raised + raiseAssert "AsyncEvent.fire should not raise exceptions" + do (err: CatchableError): + debug "Error while processing Eth1 block headers subscription", err = err.msg + let eth2PreviouslyFinalizedTo = m.db.getEth2FinalizedTo() if eth2PreviouslyFinalizedTo.isOk: m.knownStart = eth2PreviouslyFinalizedTo.get @@ -912,7 +1018,7 @@ proc startEth1Syncing(m: Eth1Monitor) {.async.} = if not m.genesisStateFut.isNil: m.genesisStateFut.complete() m.genesisStateFut = nil - m.stop() + await m.stop() return await m.eth1Progress.wait() @@ -933,43 +1039,21 @@ proc startEth1Syncing(m: Eth1Monitor) {.async.} = eth1SyncedTo = targetBlock eth1_synced_head.set eth1SyncedTo.toGaugeValue -proc run(m: Eth1Monitor, delayBeforeStart: Duration) {.async.} = - if delayBeforeStart != ZeroDuration: - await sleepAsync(delayBeforeStart) - - info "Starting Eth1 deposit contract monitoring", - contract = $m.depositContractAddress, url = m.web3Url - - await m.dataProvider.onBlockHeaders do (blk: Eth1BlockHeader) - {.raises: [Defect], gcsafe.}: - try: - if blk.number.uint64 > m.latestEth1BlockNumber: - eth1_latest_head.set blk.number.toGaugeValue - m.latestEth1BlockNumber = Eth1BlockNumber blk.number - m.eth1Progress.fire() - except Exception: - # TODO Investigate why this exception is being raised - raiseAssert "AsyncEvent.fire should not raise exceptions" - do (err: CatchableError): - debug "Error while processing Eth1 block headers subscription", err = err.msg - - await m.startEth1Syncing() - proc start(m: Eth1Monitor, delayBeforeStart: Duration) = if m.runFut.isNil: - let runFut = m.run(delayBeforeStart) + let runFut = m.startEth1Syncing(delayBeforeStart) m.runFut = runFut runFut.addCallback do (p: pointer): if runFut.failed: if runFut.error[] of CatchableError: if runFut == m.runFut: error "Eth1 chain monitoring failure, restarting", err = runFut.error.msg - m.stop() + m.state = Failed else: fatal "Fatal exception reached", err = runFut.error.msg quit 1 - m.runFut = nil + safeCancel m.runFut m.start(5.seconds) proc start*(m: Eth1Monitor) = @@ -1022,10 +1106,10 @@ when hasGenesisDetection: depositContractDeployedAt: BlockHashOrNumber, eth1Network: Option[Eth1Network]): Future[Result[T, string]] {.async.} = try: - let dataProviderRes = Web3DataProvider.new(depositContractAddress, web3Url) + let dataProviderRes = await Web3DataProvider.new(depositContractAddress, web3Url) if dataProviderRes.isErr: return err(dataProviderRes.error) - let dataProvider = dataProviderRes.get + var dataProvider = dataProviderRes.get let knownStartBlockHash = if depositContractDeployedAt.isHash: @@ -1054,64 +1138,22 @@ when hasGenesisDetection: let depositContractSnapshot = DepositContractSnapshot( eth1Block: knownStartBlockHash) - return await Eth1Monitor.init( + var monitor = Eth1Monitor.init( db, preset, web3Url, - depositContarctAddress, + depositContractAddress, depositContractSnapshot, eth1Network) + for i in 0 ..< db.genesisDeposits.len: + monitor.produceDerivedData db.genesisDeposits.get(i) + + return ok monitor + except CatchableError as err: return err("Failed to initialize the Eth1 monitor") - proc allGenesisDepositsUpTo(m: Eth1Monitor, totalDeposits: uint64): seq[DepositData] = - for i in 0'u64 ..< totalDeposits: - result.add m.db.genesisDeposits.get(i) - - proc createGenesisState(m: Eth1Monitor, eth1Block: Eth1Block): BeaconStateRef = - notice "Generating genesis state", - blockNum = eth1Block.number, - blockHash = eth1Block.voteData.block_hash, - blockTimestamp = eth1Block.timestamp, - totalDeposits = eth1Block.voteData.deposit_count, - activeValidators = eth1Block.activeValidatorsCount - - var deposits = m.allGenesisDepositsUpTo(eth1Block.voteData.deposit_count) - - result = initialize_beacon_state_from_eth1( - m.preset, - eth1Block.voteData.block_hash, - eth1Block.timestamp.uint64, - deposits, {}) - - doAssert result.validators.lenu64 == eth1Block.activeValidatorsCount - - proc signalGenesis(m: Eth1Monitor, genesisState: BeaconStateRef) = - m.genesisState = genesisState - - if not m.genesisStateFut.isNil: - m.genesisStateFut.complete() - m.genesisStateFut = nil - - template hasEnoughValidators(m: Eth1Monitor, blk: Eth1Block): bool = - blk.activeValidatorsCount >= m.preset.MIN_GENESIS_ACTIVE_VALIDATOR_COUNT - - func chainHasEnoughValidators(m: Eth1Monitor): bool = - if m.eth1Chain.blocks.len > 0: - m.hasEnoughValidators(m.eth1Chain.blocks[^1]) - else: - m.knownStart.depositContractState.depositCountU64 >= - m.preset.MIN_GENESIS_ACTIVE_VALIDATOR_COUNT - - func isAfterMinGenesisTime(m: Eth1Monitor, blk: Eth1Block): bool = - doAssert blk.timestamp != 0 - let t = genesis_time_from_eth1_timestamp(m.preset, uint64 blk.timestamp) - t >= m.preset.MIN_GENESIS_TIME - - func isGenesisCandidate(m: Eth1Monitor, blk: Eth1Block): bool = - m.hasEnoughValidators(blk) and m.isAfterMinGenesisTime(blk) - proc findGenesisBlockInRange(m: Eth1Monitor, startBlock, endBlock: Eth1Block): Future[Eth1Block] {.async.} = doAssert startBlock.timestamp != 0 and not m.isAfterMinGenesisTime(startBlock) diff --git a/beacon_chain/nimbus_beacon_node.nim b/beacon_chain/nimbus_beacon_node.nim index f5caadfc4..8321809d1 100644 --- a/beacon_chain/nimbus_beacon_node.nim +++ b/beacon_chain/nimbus_beacon_node.nim @@ -254,9 +254,7 @@ proc init*(T: type BeaconNode, genesisDepositsSnapshotContents != nil: let genesisDepositsSnapshot = SSZ.decode(genesisDepositsSnapshotContents[], DepositContractSnapshot) - # TODO(zah) if we don't have any validators attached, - # we don't need a mainchain monitor - let eth1MonitorRes = await Eth1Monitor.init( + eth1Monitor = Eth1Monitor.init( db, conf.runtimePreset, conf.web3Url, @@ -264,15 +262,6 @@ proc init*(T: type BeaconNode, genesisDepositsSnapshot, eth1Network) - if eth1MonitorRes.isErr: - error "Failed to start Eth1 monitor", - reason = eth1MonitorRes.error, - web3Url = conf.web3Url, - depositContractAddress, - depositContractDeployedAt - else: - eth1Monitor = eth1MonitorRes.get - let rpcServer = if conf.rpcEnabled: RpcServer.init(conf.rpcAddress, conf.rpcPort) else: diff --git a/beacon_chain/rpc/config_api.nim b/beacon_chain/rpc/config_api.nim index e0939596f..becd22d0d 100644 --- a/beacon_chain/rpc/config_api.nim +++ b/beacon_chain/rpc/config_api.nim @@ -24,7 +24,7 @@ func getDepositAddress(node: BeaconNode): string = if isNil(node.eth1Monitor): "" else: - $node.eth1Monitor.depositContractAddress() + $node.eth1Monitor.depositContractAddress proc installConfigApiHandlers*(rpcServer: RpcServer, node: BeaconNode) = rpcServer.rpc("get_v1_config_fork_schedule") do () -> seq[Fork]: diff --git a/vendor/nim-libp2p b/vendor/nim-libp2p index 1befeb8c2..9e5ba64c4 160000 --- a/vendor/nim-libp2p +++ b/vendor/nim-libp2p @@ -1 +1 @@ -Subproject commit 1befeb8c2e74425787d7bfd7b53d6c60302161c7 +Subproject commit 9e5ba64c4888caa46938b2ef87e74c65cb7fab4b diff --git a/vendor/nim-stew b/vendor/nim-stew index e15c1ae01..5cf4feabe 160000 --- a/vendor/nim-stew +++ b/vendor/nim-stew @@ -1 +1 @@ -Subproject commit e15c1ae01269b1c53e1c9d80741b6d8929fc4b75 +Subproject commit 5cf4feabea0820d7f03b146b0973a57973bcc4c1