diff --git a/beacon_chain/beacon_chain_db.nim b/beacon_chain/beacon_chain_db.nim index d7b0d3fd3..20e302a7d 100644 --- a/beacon_chain/beacon_chain_db.nim +++ b/beacon_chain/beacon_chain_db.nim @@ -132,6 +132,7 @@ proc createSeq*(db: SqStoreRef, baseDir, name: string, T: type): DbSeq[T] = proc add*[T](s: var DbSeq[T], val: T) = var bytes = SSZ.encode(val) s.insertStmt.exec(bytes).expect "working database" + inc s.recordCount template len*[T](s: DbSeq[T]): uint64 = s.recordCount.uint64 @@ -274,7 +275,7 @@ proc putTailBlock*(db: BeaconChainDB, key: Eth2Digest) = proc putGenesisBlockRoot*(db: BeaconChainDB, key: Eth2Digest) = db.put(subkey(kGenesisBlockRoot), key) -proc putEth1PersistedTo*(db: BeaconChainDB, key: Eth2Digest) = +proc putEth1PersistedTo*(db: BeaconChainDB, key: Eth1Data) = db.put(subkey(kEth1PersistedTo), key) proc getBlock*(db: BeaconChainDB, key: Eth2Digest): Opt[TrustedSignedBeaconBlock] = @@ -323,9 +324,10 @@ proc getGenesisBlockRoot*(db: BeaconChainDB): Eth2Digest = db.get(subkey(kGenesisBlockRoot), Eth2Digest).expect( "The database must be seeded with the genesis state") -proc getEth1PersistedTo*(db: BeaconChainDB): Eth2Digest = - db.get(subkey(kGenesisBlockRoot), Eth2Digest).expect( - "The database must be seeded with genesis eth1 data") +proc getEth1PersistedTo*(db: BeaconChainDB): Opt[Eth1Data] = + result.ok(Eth1Data()) + if db.get(subkey(kEth1PersistedTo), result.get) != GetResult.found: + result.err() proc containsBlock*(db: BeaconChainDB, key: Eth2Digest): bool = db.backend.contains(subkey(SignedBeaconBlock, key)).expect("working database") diff --git a/beacon_chain/beacon_node.nim b/beacon_chain/beacon_node.nim index d83c56660..4dfd35b4b 100644 --- a/beacon_chain/beacon_node.nim +++ b/beacon_chain/beacon_node.nim @@ -69,6 +69,27 @@ func enrForkIdFromState(state: BeaconState): ENRForkID = next_fork_version: forkVer, next_fork_epoch: FAR_FUTURE_EPOCH) +proc startMainchainMonitor(db: BeaconChainDB, + conf: BeaconNodeConf): Future[MainchainMonitor] {.async.} = + let mainchainMonitorRes = await MainchainMonitor.init( + db, + conf.runtimePreset, + conf.web3Url, + conf.depositContractAddress.get, + conf.depositContractDeployedAt.get) + + result = if mainchainMonitorRes.isOk: + mainchainMonitorRes.get + else: + fatal "Failed to start Eth1 monitor", + reason = mainchainMonitorRes.error, + web3Url = conf.web3Url, + depositContractAddress = conf.depositContractAddress.get, + depositContractDeployedAt = conf.depositContractDeployedAt.get + quit 1 + + result.start() + proc init*(T: type BeaconNode, rng: ref BrHmacDrbgContext, conf: BeaconNodeConf, @@ -139,30 +160,9 @@ proc init*(T: type BeaconNode, fatal "Deposit contract deployment block not specified" quit 1 - let web3 = web3Provider(conf.web3Url) - let deployedAtAsHash = - if conf.depositContractDeployedAt.get.startsWith "0x": - try: BlockHash.fromHex conf.depositContractDeployedAt.get - except ValueError: - fatal "Invalid hex value specified for deposit-contract-block" - quit 1 - else: - let blockNum = try: parseBiggestUInt conf.depositContractDeployedAt.get - except ValueError: - fatal "Invalid nummeric value for deposit-contract-block" - quit 1 - await getEth1BlockHash(conf.web3Url, blockId blockNum) - # TODO Could move this to a separate "GenesisMonitor" process or task # that would do only this - see Paul's proposal for this. - mainchainMonitor = MainchainMonitor.init( - db, - conf.runtimePreset, - web3, - conf.depositContractAddress.get, - Eth1Data(block_hash: deployedAtAsHash.asEth2Digest, deposit_count: 0)) - - mainchainMonitor.start() + mainchainMonitor = await startMainchainMonitor(db, conf) genesisState = await mainchainMonitor.waitGenesis() if bnStatus == BeaconNodeStatus.Stopping: @@ -232,16 +232,11 @@ proc init*(T: type BeaconNode, if mainchainMonitor.isNil and conf.web3Url.len > 0 and - conf.depositContractAddress.isSome: - mainchainMonitor = MainchainMonitor.init( - db, - conf.runtimePreset, - web3Provider(conf.web3Url), - conf.depositContractAddress.get, - chainDag.headState.data.data.eth1_data) - # TODO if we don't have any validators attached, we don't need a mainchain - # monitor - mainchainMonitor.start() + conf.depositContractAddress.isSome and + conf.depositContractDeployedAt.isSome: + # TODO if we don't have any validators attached, + # we don't need a mainchain monitor + mainchainMonitor = await startMainchainMonitor(db, conf) let rpcServer = if conf.rpcEnabled: RpcServer.init(conf.rpcAddress, conf.rpcPort) diff --git a/beacon_chain/mainchain_monitor.nim b/beacon_chain/mainchain_monitor.nim index e6acdd304..14fa95b89 100644 --- a/beacon_chain/mainchain_monitor.nim +++ b/beacon_chain/mainchain_monitor.nim @@ -1,5 +1,5 @@ import - std/[deques, tables, hashes, options, strformat], + std/[deques, tables, hashes, options, strformat, strutils], chronos, web3, web3/ethtypes as web3Types, json, chronicles, eth/common/eth_types, eth/async_utils, spec/[datatypes, digest, crypto, beaconstate, helpers, signatures], @@ -51,31 +51,19 @@ type MainchainMonitor* = ref object db: BeaconChainDB - preset: RuntimePreset - depositContractAddress: Address - dataProviderFactory*: DataProviderFactory + + dataProvider: Web3DataProviderRef + depositQueue: AsyncQueue[Eth1BlockHeader] + eth1Chain: Eth1Chain genesisState: NilableBeaconStateRef genesisStateFut: Future[void] genesisMonitoringFut: Future[void] - eth1Chain: Eth1Chain - - depositQueue: AsyncQueue[Eth1BlockHeader] runFut: Future[void] - DataProvider* = object of RootObj - DataProviderRef* = ref DataProvider - - DataProviderFactory* = object - desc: string - new: proc(depositContractAddress: Address): Future[DataProviderRef] {. - gcsafe - # raises: [Defect] - .} - - Web3DataProvider* = object of DataProvider + Web3DataProvider* = object url: string web3: Web3 ns: Sender[DepositContract] @@ -97,15 +85,17 @@ type const web3Timeouts = 5.seconds +template depositContractAddress(m: MainchainMonitor): Eth1Address = + m.dataProvider.ns.contractAddress + +template web3Url(m: MainchainMonitor): string = + m.dataProvider.url + # TODO: Add preset validation # MIN_GENESIS_ACTIVE_VALIDATOR_COUNT should be larger than SLOTS_PER_EPOCH # doAssert SECONDS_PER_ETH1_BLOCK * preset.ETH1_FOLLOW_DISTANCE < GENESIS_DELAY, # "Invalid configuration: GENESIS_DELAY is set too low" -# TODO Nim's analysis on the lock level of the methods in this -# module seems broken. Investigate and file this as an issue. -{.push warning[LockLevel]: off.} - # https://github.com/ethereum/eth2.0-specs/blob/v1.0.0-rc.0/specs/phase0/validator.md#get_eth1_data func compute_time_at_slot(state: BeaconState, slot: Slot): uint64 = state.genesis_time + slot * SECONDS_PER_SLOT @@ -209,102 +199,19 @@ func clear*(eth1Chain: var Eth1Chain) = template hash*(x: Eth1Block): Hash = hash(x.voteData.block_hash.data) -template notImplemented = - doAssert false, "Method not implemented" - -method getBlockByHash*(p: DataProviderRef, hash: BlockHash): Future[BlockObject] {. - base - gcsafe - locks: 0 - # raises: [Defect] -.} = - notImplemented - -method getBlockByNumber*(p: DataProviderRef, hash: Eth1BlockNumber): Future[BlockObject] {. - base - gcsafe - locks: 0 - # raises: [Defect] -.} = - notImplemented - -method onDisconnect*(p: DataProviderRef, handler: DisconnectHandler) {. - base - gcsafe - locks: 0 - # raises: [] -.} = - notImplemented - -method onBlockHeaders*(p: DataProviderRef, - blockHeaderHandler: BlockHeaderHandler, - errorHandler: SubscriptionErrorHandler): Future[void] {. - base - gcsafe - locks: 0 - # raises: [] -.} = - notImplemented - -method close*(p: DataProviderRef): Future[void] {. - base - gcsafe - locks: 0 - # raises: [Defect] -.} = - notImplemented - -method fetchDepositData*(p: DataProviderRef, - fromBlock, toBlock: Eth1BlockNumber): Future[seq[Eth1Block]] {. - base - gcsafe - locks: 0 - # raises: [Defect, CatchableError] -.} = - notImplemented - -method fetchBlockDetails(p: DataProviderRef, blk: Eth1Block): Future[void] {. - base - gcsafe - locks: 0 - # raises: [Defect, CatchableError] -.} = - notImplemented - -proc new*(T: type Web3DataProvider, - web3Url: string, - depositContractAddress: Address): Future[ref Web3DataProvider] {. - async - # raises: [Defect] -.} = - try: - type R = ref T - let - web3 = await newWeb3(web3Url) - ns = web3.contractSender(DepositContract, depositContractAddress) - return R(url: web3Url, web3: web3, ns: ns) - except CatchableError: - return nil - -func web3Provider*(web3Url: string): DataProviderFactory = - proc factory(depositContractAddress: Address): Future[DataProviderRef] {.async.} = - result = await Web3DataProvider.new(web3Url, depositContractAddress) - - DataProviderFactory(desc: "web3(" & web3Url & ")", new: factory) - -method close*(p: Web3DataProviderRef): Future[void] {.async, locks: 0.} = +proc close*(p: Web3DataProviderRef): Future[void] {.async, locks: 0.} = if p.blockHeadersSubscription != nil: await p.blockHeadersSubscription.unsubscribe() await p.web3.close() -method getBlockByHash*(p: Web3DataProviderRef, hash: BlockHash): Future[BlockObject] = +proc getBlockByHash*(p: Web3DataProviderRef, hash: BlockHash): Future[BlockObject] = return p.web3.provider.eth_getBlockByHash(hash, false) -method getBlockByNumber*(p: Web3DataProviderRef, number: Eth1BlockNumber): Future[BlockObject] = +proc getBlockByNumber*(p: Web3DataProviderRef, number: Eth1BlockNumber): Future[BlockObject] = return p.web3.provider.eth_getBlockByNumber(&"0x{number:X}", false) -proc getBlockNumber(p: DataProviderRef, hash: BlockHash): Future[Eth1BlockNumber] {.async.} = +proc getBlockNumber(p: Web3DataProviderRef, hash: BlockHash): Future[Eth1BlockNumber] {.async.} = try: let blk = awaitWithTimeout(p.getBlockByHash(hash), web3Timeouts): return 0 @@ -362,15 +269,15 @@ proc readJsonDeposits(depositsList: JsonNode): seq[Eth1Block] = amount: bytes_to_uint64(array[8, byte](amount)), signature: ValidatorSig.init(array[96, byte](signature)))) -method fetchDepositData*(p: Web3DataProviderRef, - fromBlock, toBlock: Eth1BlockNumber): Future[seq[Eth1Block]] - {.async, locks: 0.} = +proc fetchDepositData*(p: Web3DataProviderRef, + fromBlock, toBlock: Eth1BlockNumber): Future[seq[Eth1Block]] + {.async, locks: 0.} = info "Obtaining deposit log events", fromBlock, toBlock return readJsonDeposits(await p.ns.getJsonLogs(DepositEvent, fromBlock = some blockId(fromBlock), toBlock = some blockId(toBlock))) -method fetchBlockDetails(p: Web3DataProviderRef, blk: Eth1Block) {.async.} = +proc fetchBlockDetails(p: Web3DataProviderRef, blk: Eth1Block) {.async.} = let web3Block = p.getBlockByNumber(blk.number) depositRoot = p.ns.get_deposit_root.call(blockNumber = blk.number) @@ -386,16 +293,16 @@ method fetchBlockDetails(p: Web3DataProviderRef, blk: Eth1Block) {.async.} = blk.voteData.deposit_count = depositCount blk.voteData.deposit_root = depositRoot.read.asEth2Digest -method onDisconnect*(p: Web3DataProviderRef, handler: DisconnectHandler) {. +proc onDisconnect*(p: Web3DataProviderRef, handler: DisconnectHandler) {. gcsafe locks: 0 # raises: [] .} = p.web3.onDisconnect = handler -method onBlockHeaders*(p: Web3DataProviderRef, - blockHeaderHandler: BlockHeaderHandler, - errorHandler: SubscriptionErrorHandler): Future[void] {. +proc onBlockHeaders*(p: Web3DataProviderRef, + blockHeaderHandler: BlockHeaderHandler, + errorHandler: SubscriptionErrorHandler): Future[void] {. async gcsafe locks: 0 @@ -456,30 +363,53 @@ proc getBlockProposalData*(m: MainchainMonitor, proc init*(T: type MainchainMonitor, db: BeaconChainDB, preset: RuntimePreset, - dataProviderFactory: DataProviderFactory, + web3Url: string, depositContractAddress: Eth1Address, - startPosition: Eth1Data): T = - T(db: db, + depositContractDeployedAt: string): Future[Result[T, string]] {.async.} = + let web3 = try: await newWeb3(web3Url) + except CatchableError as err: + return err "Failed to setup web3 connection" + let + ns = web3.contractSender(DepositContract, depositContractAddress) + dataProvider = Web3DataProviderRef(url: web3Url, web3: web3, ns: ns) + + let + previouslyPersistedTo = db.getEth1PersistedTo() + knownStart = previouslyPersistedTo.get: + # `previouslyPersistedTo` wall null, we start from scratch + let deployedAtHash = if depositContractDeployedAt.startsWith "0x": + try: BlockHash.fromHex depositContractDeployedAt + except ValueError: + return err "Invalid hex value specified for deposit-contract-block" + else: + let blockNum = try: parseBiggestUInt depositContractDeployedAt + except ValueError: + return err "Invalid nummeric value for deposit-contract-block" + try: + let blk = await dataProvider.getBlockByNumber(blockNum) + blk.hash + except CatchableError: + return err("Failed to obtain block hash for block number " & $blockNum) + Eth1Data(block_hash: deployedAtHash.asEth2Digest, deposit_count: 0) + + return ok T( + db: db, preset: preset, + dataProvider: dataProvider, depositQueue: newAsyncQueue[Eth1BlockHeader](), - dataProviderFactory: dataProviderFactory, - depositContractAddress: Address depositContractAddress, - eth1Chain: Eth1Chain(knownStart: startPosition)) + eth1Chain: Eth1Chain(knownStart: knownStart)) proc persistFinalizedBlocks(m: MainchainMonitor, timeNow: float): tuple[ genesisBlock: Eth1Block, previousBlock: Eth1Block ] = - if m.eth1Chain.blocks.len == 0: - return - let followDistanceInSeconds = uint64(SECONDS_PER_ETH1_BLOCK) * m.preset.ETH1_FOLLOW_DISTANCE var prevBlock: Eth1Block # TODO: The DB operations should be executed as a transaction here block: # TODO Begin Transaction - while true: + while m.eth1Chain.blocks.len > 0: let blk = m.eth1Chain.blocks.peekFirst if float(blk.timestamp + followDistanceInSeconds) > timeNow: break @@ -496,6 +426,8 @@ proc persistFinalizedBlocks(m: MainchainMonitor, timeNow: float): tuple[ withdrawal_credentials: deposit.data.withdrawal_credentials) m.db.validatorsByKey.insert(pubkey, ValidatorIndex idx) + # TODO The len property is currently stored in memory which + # makes it unsafe in the face of failed transactions blk.knownValidatorsCount = some m.db.validators.len discard m.eth1Chain.blocks.popFirst() @@ -511,7 +443,11 @@ proc persistFinalizedBlocks(m: MainchainMonitor, timeNow: float): tuple[ if prevBlock != nil: # TODO commit transaction - m.db.putEth1PersistedTo prevBlock.voteData.block_hash + m.db.putEth1PersistedTo prevBlock.voteData + m.eth1Chain.knownStart = prevBlock.voteData + notice "Eth1 sync progress", + blockNumber = prevBlock.number, + depositsProcessed = prevBlock.voteData.deposit_count # TODO Commit @@ -540,12 +476,6 @@ proc signalGenesis(m: MainchainMonitor, genesisState: BeaconStateRef) = proc findGenesisBlockInRange(m: MainchainMonitor, startBlock, endBlock: Eth1Block): Future[Eth1Block] {.async.} = - let dataProvider = await m.dataProviderFactory.new(m.depositContractAddress) - if dataProvider == nil: - error "Failed to initialize Eth1 data provider", - provider = m.dataProviderFactory.desc - raise newException(CatchableError, "Failed to initialize Eth1 data provider") - var startBlock = startBlock endBlock = endBlock @@ -559,7 +489,7 @@ proc findGenesisBlockInRange(m: MainchainMonitor, float(endBlock.number - startBlock.number) blocksToJump = max(float(MIN_GENESIS_TIME - startBlockTime) / secondsPerBlock, 1.0) candidateNumber = min(endBlock.number - 1, startBlock.number + 1) # blocksToJump.uint64) - candidateBlock = await dataProvider.getBlockByNumber(candidateNumber) + candidateBlock = await m.dataProvider.getBlockByNumber(candidateNumber) var candidateAsEth1Block = Eth1Block(number: candidateBlock.number.uint64, timestamp: candidateBlock.timestamp.uint64, @@ -686,7 +616,7 @@ func maxValidDeposits(eth1Chain: Eth1Chain): uint64 = 0 proc processDeposits(m: MainchainMonitor, - dataProvider: DataProviderRef) {.async.} = + dataProvider: Web3DataProviderRef) {.async.} = # ATTENTION! # Please note that this code is using a queue to guarantee the # strict serial order of processing of deposits. If we had the @@ -696,6 +626,9 @@ proc processDeposits(m: MainchainMonitor, while true: m.checkIfShouldStopMainchainMonitor() + let now = epochTime() + discard m.persistFinalizedBlocks(now) + let blk = await m.depositQueue.popFirst() m.eth1Chain.trimHeight(Eth1BlockNumber(blk.number) - 1) @@ -728,8 +661,8 @@ proc processDeposits(m: MainchainMonitor, voteData: latestEth1Data) else: template logBlockProcessed(blk) = - info "Eth1 block processed", - `block` = shortLog(blk), totalDeposits = blk.voteData.deposit_count + debug "Eth1 block processed", + `block` = shortLog(blk), totalDeposits = blk.voteData.deposit_count await dataProvider.fetchBlockDetails(eth1Blocks[0]) if m.eth1Chain.addSuccessorBlock(eth1Blocks[0]): @@ -767,33 +700,22 @@ proc run(m: MainchainMonitor, delayBeforeStart: Duration) {.async.} = if delayBeforeStart != ZeroDuration: await sleepAsync(delayBeforeStart) - let dataProvider = await m.dataProviderFactory.new(m.depositContractAddress) - if dataProvider == nil: - error "Failed to initialize Eth1 data provider", - provider = m.dataProviderFactory.desc - raise newException(CatchableError, "Failed to initialize Eth1 data provider") + info "Starting Eth1 deposit contract monitoring", + contract = $m.depositContractAddress, url = m.web3Url - try: - info "Starting Eth1 deposit contract monitoring", - contract = $m.depositContractAddress, - url = m.dataProviderFactory.desc - - await dataProvider.onBlockHeaders do (blk: Eth1BlockHeader) + await m.dataProvider.onBlockHeaders do (blk: Eth1BlockHeader) {.raises: [Defect], gcsafe.}: - try: - m.depositQueue.addLastNoWait(blk) - except AsyncQueueFullError: - raiseAssert "The depositQueue has no size limit" - except Exception: - # TODO Investigate why this exception is being raised - raiseAssert "queue.addLastNoWait should not raise exceptions" - do (err: CatchableError): - debug "Error while processing Eth1 block headers subscription", err = err.msg + try: + m.depositQueue.addLastNoWait(blk) + except AsyncQueueFullError: + raiseAssert "The depositQueue has no size limit" + except Exception: + # TODO Investigate why this exception is being raised + raiseAssert "queue.addLastNoWait should not raise exceptions" + do (err: CatchableError): + debug "Error while processing Eth1 block headers subscription", err = err.msg - await m.processDeposits(dataProvider) - - finally: - await close(dataProvider) + await m.processDeposits(m.dataProvider) proc start(m: MainchainMonitor, delayBeforeStart: Duration) = if m.runFut.isNil: @@ -821,4 +743,3 @@ proc getEth1BlockHash*(url: string, blockId: RtBlockIdentifier): Future[BlockHas finally: await web3.close() -{.pop.} diff --git a/tests/test_mainchain_monitor.nim b/tests/test_mainchain_monitor.nim index ff4186d6b..9cb8692b8 100644 --- a/tests/test_mainchain_monitor.nim +++ b/tests/test_mainchain_monitor.nim @@ -5,48 +5,6 @@ import chronos, web3/ethtypes, ../beacon_chain/mainchain_monitor -type - MockDataProvider = ref object of DataProvider - - -method getBlockByHash*(p: MockDataProvider, hash: BlockHash): Future[BlockObject] {. - async - gcsafe - # raises: [Defect] -.} = - return BlockObject() - -method onDisconnect*(p: MockDataProvider, handler: DisconnectHandler) {. - async - gcsafe - # raises: [] -.} = - discard - -method onDepositEvent*(p: MockDataProvider, - startBlock: Eth1BlockNumber, - handler: DepositEventHandler): Future[void] {. - async - gcsafe - # raises: [] -.} = - discard - -method close*(p: MockDataProvider): Future[void] {. - async - gcsafe - # raises: [Defect] -.} = - discard - -method fetchDepositData*(p: MockDataProvider, - web3Block: BlockObject): Future[Eth1Block] {. - async - gcsafe - # raises: [Defect, CatchableError] -.} = - return Eth1Block() - suite "Eth1 Chain": discard