diff --git a/AllTests-mainnet.md b/AllTests-mainnet.md index 3095239e9..f7c533f7f 100644 --- a/AllTests-mainnet.md +++ b/AllTests-mainnet.md @@ -98,11 +98,6 @@ OK: 2/2 Fail: 0/2 Skip: 0/2 + Mocked start private key OK ``` OK: 3/3 Fail: 0/3 Skip: 0/3 -## Mocking utilities -```diff -+ merkle_minimal OK -``` -OK: 1/1 Fail: 0/1 Skip: 0/1 ## Official - constants & config [Preset: mainnet] ```diff + BASE_REWARD_FACTOR 64 [Preset: mainnet] OK @@ -272,5 +267,4 @@ OK: 1/1 Fail: 0/1 Skip: 0/1 OK: 1/1 Fail: 0/1 Skip: 0/1 ---TOTAL--- -OK: 145/154 Fail: 0/154 Skip: 9/154 - +OK: 144/153 Fail: 0/153 Skip: 9/153 diff --git a/beacon_chain/attestation_pool.nim b/beacon_chain/attestation_pool.nim index b1d86ce57..5424b1ef6 100644 --- a/beacon_chain/attestation_pool.nim +++ b/beacon_chain/attestation_pool.nim @@ -9,7 +9,7 @@ import # Standard libraries - std/[algorithm, deques, sequtils, sets, tables, options], + std/[deques, sequtils, sets, tables, options], # Status libraries chronicles, stew/[byteutils], json_serialization/std/sets as jsonSets, # Internal @@ -27,9 +27,10 @@ proc init*(T: type AttestationPool, chainDag: ChainDAGRef, quarantine: Quarantin ## Initialize an AttestationPool from the chainDag `headState` ## The `finalized_root` works around the finalized_checkpoint of the genesis block ## holding a zero_root. + let finalizedEpochRef = chainDag.getFinalizedEpochRef() var forkChoice = ForkChoice.init( - chainDag.getFinalizedEpochRef(), + finalizedEpochRef, chainDag.finalizedHead.blck) # Feed fork choice with unfinalized history - during startup, block pool only diff --git a/beacon_chain/beacon_node.nim b/beacon_chain/beacon_node.nim index b26d2b89b..a67b16d6d 100644 --- a/beacon_chain/beacon_node.nim +++ b/beacon_chain/beacon_node.nim @@ -66,20 +66,20 @@ func enrForkIdFromState(state: BeaconState): ENRForkID = next_fork_version: forkVer, next_fork_epoch: FAR_FUTURE_EPOCH) -proc startMainchainMonitor(db: BeaconChainDB, - conf: BeaconNodeConf): Future[MainchainMonitor] {.async.} = - let mainchainMonitorRes = await MainchainMonitor.init( +proc startEth1Monitor(db: BeaconChainDB, + conf: BeaconNodeConf): Future[Eth1Monitor] {.async.} = + let eth1MonitorRes = await Eth1Monitor.init( db, conf.runtimePreset, conf.web3Url, conf.depositContractAddress.get, conf.depositContractDeployedAt.get) - result = if mainchainMonitorRes.isOk: - mainchainMonitorRes.get + result = if eth1MonitorRes.isOk: + eth1MonitorRes.get else: fatal "Failed to start Eth1 monitor", - reason = mainchainMonitorRes.error, + reason = eth1MonitorRes.error, web3Url = conf.web3Url, depositContractAddress = conf.depositContractAddress.get, depositContractDeployedAt = conf.depositContractDeployedAt.get @@ -98,7 +98,7 @@ proc init*(T: type BeaconNode, db = BeaconChainDB.init(conf.runtimePreset, conf.databaseDir) var - mainchainMonitor: MainchainMonitor + eth1Monitor: Eth1Monitor genesisState, checkpointState: ref BeaconState checkpointBlock: SignedBeaconBlock @@ -159,9 +159,9 @@ proc init*(T: type BeaconNode, # TODO Could move this to a separate "GenesisMonitor" process or task # that would do only this - see Paul's proposal for this. - mainchainMonitor = await startMainchainMonitor(db, conf) + eth1Monitor = await startEth1Monitor(db, conf) - genesisState = await mainchainMonitor.waitGenesis() + genesisState = await eth1Monitor.waitGenesis() if bnStatus == BeaconNodeStatus.Stopping: return nil @@ -227,13 +227,13 @@ proc init*(T: type BeaconNode, if checkpointState != nil: chainDag.setTailState(checkpointState[], checkpointBlock) - if mainchainMonitor.isNil and + if eth1Monitor.isNil and conf.web3Url.len > 0 and conf.depositContractAddress.isSome and conf.depositContractDeployedAt.isSome: # TODO if we don't have any validators attached, # we don't need a mainchain monitor - mainchainMonitor = await startMainchainMonitor(db, conf) + eth1Monitor = await startEth1Monitor(db, conf) let rpcServer = if conf.rpcEnabled: RpcServer.init(conf.rpcAddress, conf.rpcPort) @@ -259,7 +259,7 @@ proc init*(T: type BeaconNode, quarantine: quarantine, attestationPool: attestationPool, exitPool: exitPool, - mainchainMonitor: mainchainMonitor, + eth1Monitor: eth1Monitor, beaconClock: beaconClock, rpcServer: rpcServer, forkDigest: enrForkId.forkDigest, diff --git a/beacon_chain/beacon_node_common.nim b/beacon_chain/beacon_node_common.nim index c13cf21b1..191605d62 100644 --- a/beacon_chain/beacon_node_common.nim +++ b/beacon_chain/beacon_node_common.nim @@ -42,7 +42,7 @@ type quarantine*: QuarantineRef attestationPool*: ref AttestationPool exitPool*: ref ExitPool - mainchainMonitor*: MainchainMonitor + eth1Monitor*: Eth1Monitor beaconClock*: BeaconClock rpcServer*: RpcServer vcProcess*: Process diff --git a/beacon_chain/mainchain_monitor.nim b/beacon_chain/mainchain_monitor.nim index 1dc2bafa5..05c6163be 100644 --- a/beacon_chain/mainchain_monitor.nim +++ b/beacon_chain/mainchain_monitor.nim @@ -5,8 +5,6 @@ import spec/[datatypes, digest, crypto, beaconstate, helpers], ssz, beacon_chain_db, network_metadata, merkle_minimal, beacon_node_status -from times import epochTime - export web3Types @@ -40,26 +38,26 @@ type timestamp*: Eth1BlockTimestamp deposits*: seq[Deposit] voteData*: Eth1Data - knownValidatorsCount*: Option[uint64] + activeValidatorsCount*: uint64 Eth1Chain* = object knownStart: Eth1Data - knownStartBlockNum: Option[Eth1BlockNumber] - blocks: Deque[Eth1Block] blocksByHash: Table[BlockHash, Eth1Block] - MainchainMonitor* = ref object + Eth1Monitor* = ref object db: BeaconChainDB preset: RuntimePreset dataProvider: Web3DataProviderRef - depositQueue: AsyncQueue[Eth1BlockHeader] + + latestEth1BlockNumber: Eth1BlockNumber + eth1Progress: AsyncEvent + eth1Chain: Eth1Chain genesisState: NilableBeaconStateRef genesisStateFut: Future[void] - genesisMonitoringFut: Future[void] runFut: Future[void] @@ -84,11 +82,12 @@ type const web3Timeouts = 5.seconds + hasDepositRootChecks = defined(with_deposit_root_checks) -template depositContractAddress(m: MainchainMonitor): Eth1Address = +template depositContractAddress(m: Eth1Monitor): Eth1Address = m.dataProvider.ns.contractAddress -template web3Url(m: MainchainMonitor): string = +template web3Url(m: Eth1Monitor): string = m.dataProvider.url # TODO: Add preset validation @@ -126,61 +125,41 @@ template findBlock*(eth1Chain: Eth1Chain, hash: BlockHash): Eth1Block = template findBlock*(eth1Chain: Eth1Chain, eth1Data: Eth1Data): Eth1Block = getOrDefault(eth1Chain.blocksByHash, asBlockHash(eth1Data.block_hash), nil) -proc findParent*(eth1Chain: Eth1Chain, blk: BlockObject): Eth1Block = - result = eth1Chain.findBlock(blk.parentHash) - # a distinct type is stipped here: - let blockNumber = Eth1BlockNumber(blk.number) - if result != nil and result.number != blockNumber - 1: - debug "Found inconsistent numbering of Eth1 blocks. Ignoring block.", - blockHash = blk.hash.toHex, blockNumber, - parentHash = blk.parentHash.toHex, parentNumber = result.number - result = nil - -func latestCandidateBlock(eth1Chain: Eth1Chain, preset: RuntimePreset, periodStart: uint64): Eth1Block = +func latestCandidateBlock(eth1Chain: Eth1Chain, + preset: RuntimePreset, + periodStart: uint64): Eth1Block = for i in countdown(eth1Chain.blocks.len - 1, 0): let blk = eth1Chain.blocks[i] if is_candidate_block(preset, blk, periodStart): return blk -func popBlock(eth1Chain: var Eth1Chain) = - let removed = eth1Chain.blocks.popLast +func popFirst(eth1Chain: var Eth1Chain) = + let removed = eth1Chain.blocks.popFirst eth1Chain.blocksByHash.del removed.voteData.block_hash.asBlockHash -func trimHeight(eth1Chain: var Eth1Chain, blockNumber: Eth1BlockNumber) = - ## Removes all blocks above certain `blockNumber` - while eth1Chain.blocks.len > 0: - if eth1Chain.blocks.peekLast.number > blockNumber: - eth1Chain.popBlock() - else: - break - -func addBlock*(eth1Chain: var Eth1Chain, newBlock: Eth1Block) = +func addBlock(eth1Chain: var Eth1Chain, newBlock: Eth1Block) = eth1Chain.blocks.addLast newBlock eth1Chain.blocksByHash[newBlock.voteData.block_hash.asBlockHash] = newBlock -proc allDepositsUpTo*(m: MainchainMonitor, totalDeposits: uint64): seq[Deposit] = - for i in 0'u64 ..< totalDeposits: - result.add Deposit(data: m.db.deposits.get(i)) - -func clear*(eth1Chain: var Eth1Chain) = - eth1Chain = default(Eth1Chain) - template hash*(x: Eth1Block): Hash = hash(x.voteData.block_hash.data) -proc close*(p: Web3DataProviderRef): Future[void] {.async, locks: 0.} = +proc close*(p: Web3DataProviderRef): Future[void] {.async.} = if p.blockHeadersSubscription != nil: await p.blockHeadersSubscription.unsubscribe() await p.web3.close() -proc getBlockByHash*(p: Web3DataProviderRef, hash: BlockHash): Future[BlockObject] = +proc getBlockByHash*(p: Web3DataProviderRef, hash: BlockHash): + Future[BlockObject] = return p.web3.provider.eth_getBlockByHash(hash, false) -proc getBlockByNumber*(p: Web3DataProviderRef, number: Eth1BlockNumber): Future[BlockObject] = +proc getBlockByNumber*(p: Web3DataProviderRef, + number: Eth1BlockNumber): Future[BlockObject] = return p.web3.provider.eth_getBlockByNumber(&"0x{number:X}", false) -proc getBlockNumber(p: Web3DataProviderRef, hash: BlockHash): Future[Eth1BlockNumber] {.async.} = +proc getBlockNumber(p: Web3DataProviderRef, hash: BlockHash): + Future[Eth1BlockNumber] {.async.} = try: let blk = awaitWithTimeout(p.getBlockByHash(hash), web3Timeouts): return 0 @@ -190,14 +169,12 @@ proc getBlockNumber(p: Web3DataProviderRef, hash: BlockHash): Future[Eth1BlockNu hash = $hash, err = exc.msg raise exc -template readJsonField(j: JsonNode, - fieldName: string, - ValueType: type): untyped = +template readJsonField(j: JsonNode, fieldName: string, ValueType: type): untyped = var res: ValueType fromJson(j[fieldName], fieldName, res) res -proc readJsonDeposits(depositsList: JsonNode): seq[Eth1Block] = +proc depositEventsToBlocks(depositsList: JsonNode): seq[Eth1Block] = if depositsList.kind != JArray: raise newException(CatchableError, "Web3 provider didn't return a list of deposit events") @@ -238,39 +215,56 @@ proc readJsonDeposits(depositsList: JsonNode): seq[Eth1Block] = amount: bytes_to_uint64(array[8, byte](amount)), signature: ValidatorSig.init(array[96, byte](signature)))) -proc fetchDepositData*(p: Web3DataProviderRef, - fromBlock, toBlock: Eth1BlockNumber): Future[seq[Eth1Block]] - {.async, locks: 0.} = - var currentBlock = fromBlock - while currentBlock <= toBlock: - var blocksPerRequest = 128'u64 - while true: - let requestToBlock = min(toBlock, currentBlock + blocksPerRequest - 1) +proc fetchTimestamp(p: Web3DataProviderRef, blk: Eth1Block) {.async.} = + let web3block = await p.getBlockByHash(blk.voteData.block_hash.asBlockHash) + blk.timestamp = Eth1BlockTimestamp web3block.timestamp - debug "Obtaining deposit log events", - fromBlock = currentBlock, - toBlock = requestToBlock +when hasDepositRootChecks: + type + DepositContractDataStatus = enum + Fetched + VerifiedCorrect + DepositRootIncorrect + DepositRootUnavailable + DepositCountIncorrect + DepositCountUnavailable - let depositLogs = try: - await p.ns.getJsonLogs( - DepositEvent, - fromBlock = some blockId(currentBlock), - toBlock = some blockId(requestToBlock)) - except CatchableError as err: - blocksPerRequest = blocksPerRequest div 2 - if blocksPerRequest > 0: - continue - raise err + proc fetchDepositContractData(p: Web3DataProviderRef, blk: Eth1Block): + Future[DepositContractDataStatus] {.async.} = + let + depositRoot = p.ns.get_deposit_root.call(blockNumber = blk.number) + rawCount = p.ns.get_deposit_count.call(blockNumber = blk.number) - currentBlock = requestToBlock + 1 - result.add readJsonDeposits(depositLogs) - break # breaks the inner "retry" loop and continues - # to the next range of blocks to request + try: + let fetchedRoot = asEth2Digest(await depositRoot) + if blk.voteData.deposit_root == default(Eth2Digest): + blk.voteData.deposit_root = fetchedRoot + result = Fetched + elif blk.voteData.deposit_root == fetchedRoot: + result = VerifiedCorrect + else: + result = DepositRootIncorrect + except CatchableError as err: + debug "Failed to fetch deposits root", + blockNumber = blk.number, + err = err.msg + result = DepositRootUnavailable + + try: + let fetchedCount = bytes_to_uint64(array[8, byte](await rawCount)) + if blk.voteData.deposit_count == 0: + blk.voteData.deposit_count = fetchedCount + elif blk.voteData.deposit_count != fetchedCount: + result = DepositCountIncorrect + except CatchableError as err: + debug "Failed to fetch deposits count", + blockNumber = blk.number, + err = err.msg + result = DepositCountUnavailable proc onBlockHeaders*(p: Web3DataProviderRef, blockHeaderHandler: BlockHeaderHandler, - errorHandler: SubscriptionErrorHandler): Future[void] - {.async, gcsafe.} = + errorHandler: SubscriptionErrorHandler) {.async.} = if p.blockHeadersSubscription != nil: await p.blockHeadersSubscription.unsubscribe() @@ -280,7 +274,7 @@ proc onBlockHeaders*(p: Web3DataProviderRef, blockHeaderHandler, errorHandler) # https://github.com/ethereum/eth2.0-specs/blob/v1.0.0-rc.0/specs/phase0/validator.md#get_eth1_data -proc getBlockProposalData*(m: MainchainMonitor, +proc getBlockProposalData*(m: Eth1Monitor, state: BeaconState, finalizedEth1Data: Eth1Data): (Eth1Data, seq[Deposit]) = # TODO To make block proposal cheaper, we can perform this action more regularly @@ -339,7 +333,7 @@ proc getBlockProposalData*(m: MainchainMonitor, swap(result[1], deposits) -proc init*(T: type MainchainMonitor, +proc init*(T: type Eth1Monitor, db: BeaconChainDB, preset: RuntimePreset, web3Url: string, @@ -375,114 +369,62 @@ proc init*(T: type MainchainMonitor, db: db, preset: preset, dataProvider: dataProvider, - depositQueue: newAsyncQueue[Eth1BlockHeader](), + eth1Progress: newAsyncEvent(), eth1Chain: Eth1Chain(knownStart: knownStart)) -proc fetchBlockDetails(p: Web3DataProviderRef, blk: Eth1Block) {.async.} = - let - web3Block = p.getBlockByNumber(blk.number) - depositRoot = p.ns.get_deposit_root.call(blockNumber = blk.number) - rawCount = p.ns.get_deposit_count.call(blockNumber = blk.number) +proc allDepositsUpTo(m: Eth1Monitor, totalDeposits: uint64): seq[Deposit] = + for i in 0'u64 ..< totalDeposits: + result.add Deposit(data: m.db.deposits.get(i)) - var error: ref CatchableError = nil - - try: blk.voteData.deposit_root = asEth2Digest(await depositRoot) - except CatchableError as err: error = err - - try: blk.voteData.deposit_count = bytes_to_uint64(array[8, byte](await rawCount)) - except CatchableError as err: error = err - - if error != nil: - debug "Deposit contract data not available", - blockNumber = blk.number, - err = error.msg - - blk.timestamp = Eth1BlockTimestamp (await web3Block).timestamp - -proc persistFinalizedBlocks(m: MainchainMonitor, timeNow: float): - Future[tuple[genesisBlock: Eth1Block, previousBlock: Eth1Block]] {.async.} = - - let followDistanceInSeconds = uint64(SECONDS_PER_ETH1_BLOCK) * - m.preset.ETH1_FOLLOW_DISTANCE - var prevBlock: Eth1Block - - # TODO: The DB operations should be executed as a transaction here - block: # TODO Begin Transaction - while m.eth1Chain.blocks.len > 0: - # TODO keep a separate set of candidate blocks - var blk = m.eth1Chain.blocks.peekFirst - if blk.timestamp != 0: - await fetchBlockDetails(m.dataProvider, blk) - - if float(blk.timestamp + followDistanceInSeconds) > timeNow: - break - - for deposit in blk.deposits: - m.db.processDeposit(deposit.data) - - # TODO compare our results against the web3 provider - blk.voteData.deposit_count = m.db.finalizedEth1DepositsMerkleizer.totalChunks - blk.voteData.deposit_root = m.db.finalizedEth1DepositsMerkleizer.getFinalHash - - # TODO - # Try to confirm history by obtaining deposit_root from a more - # recent block - - # TODO The len property is currently stored in memory which - # makes it unsafe in the face of failed transactions - let activeValidatorsCount = m.db.immutableValidatorData.lenu64 - blk.knownValidatorsCount = some activeValidatorsCount - - discard m.eth1Chain.blocks.popFirst() - m.eth1Chain.blocksByHash.del blk.voteData.block_hash.asBlockHash - - let blockGenesisTime = genesis_time_from_eth1_timestamp(m.preset, - blk.timestamp) - if blockGenesisTime >= m.preset.MIN_GENESIS_TIME and - activeValidatorsCount >= m.preset.MIN_GENESIS_ACTIVE_VALIDATOR_COUNT: - result = (blk, prevBlock) - - prevBlock = blk - - if prevBlock != nil: - # TODO commit transaction - m.db.putEth1PersistedTo prevBlock.voteData - m.eth1Chain.knownStart = prevBlock.voteData - notice "Eth1 sync progress", - blockNumber = prevBlock.number, - depositsProcessed = prevBlock.voteData.deposit_count - - # TODO Commit - -proc createGenesisState(m: MainchainMonitor, eth1Block: Eth1Block): BeaconStateRef = +proc createGenesisState(m: Eth1Monitor, eth1Block: Eth1Block): BeaconStateRef = notice "Generating genesis state", blockNum = eth1Block.number, blockHash = eth1Block.voteData.block_hash, + blockTimestamp = eth1Block.timestamp, totalDeposits = eth1Block.voteData.deposit_count, - activeValidators = eth1Block.knownValidatorsCount.get + activeValidators = eth1Block.activeValidatorsCount var deposits = m.allDepositsUpTo(eth1Block.voteData.deposit_count) attachMerkleProofs deposits - initialize_beacon_state_from_eth1(m.preset, - eth1Block.voteData.block_hash, - eth1Block.timestamp.uint64, - deposits, {}) + result = initialize_beacon_state_from_eth1( + m.preset, + eth1Block.voteData.block_hash, + eth1Block.timestamp.uint64, + deposits, {}) -proc signalGenesis(m: MainchainMonitor, genesisState: BeaconStateRef) = + doAssert result.validators.lenu64 == eth1Block.activeValidatorsCount + +proc signalGenesis(m: Eth1Monitor, genesisState: BeaconStateRef) = m.genesisState = genesisState if not m.genesisStateFut.isNil: m.genesisStateFut.complete() m.genesisStateFut = nil -proc findGenesisBlockInRange(m: MainchainMonitor, - startBlock, endBlock: Eth1Block): Future[Eth1Block] - {.async.} = +template hasEnoughValidators(m: Eth1Monitor, blk: Eth1Block): bool = + blk.activeValidatorsCount >= m.preset.MIN_GENESIS_ACTIVE_VALIDATOR_COUNT + +func isAfterMinGenesisTime(m: Eth1Monitor, blk: Eth1Block): bool = + doAssert blk.timestamp != 0 + let t = genesis_time_from_eth1_timestamp(m.preset, uint64 blk.timestamp) + t >= m.preset.MIN_GENESIS_TIME + +func isGenesisCandidate(m: Eth1Monitor, blk: Eth1Block): bool = + m.hasEnoughValidators(blk) and m.isAfterMinGenesisTime(blk) + +proc findGenesisBlockInRange(m: Eth1Monitor, startBlock, endBlock: Eth1Block): + Future[Eth1Block] {.async.} = + doAssert startBlock.timestamp != 0 and not m.isAfterMinGenesisTime(startBlock) + doAssert endBlock.timestamp != 0 and m.isAfterMinGenesisTime(endBlock) + doAssert m.hasEnoughValidators(startBlock) + doAssert m.hasEnoughValidators(endBlock) + var startBlock = startBlock endBlock = endBlock depositData = startBlock.voteData + activeValidatorsCountDuringRange = startBlock.activeValidatorsCount while startBlock.number + 1 < endBlock.number: let @@ -491,7 +433,7 @@ proc findGenesisBlockInRange(m: MainchainMonitor, secondsPerBlock = float(endBlock.timestamp - startBlock.timestamp) / float(endBlock.number - startBlock.number) blocksToJump = max(float(MIN_GENESIS_TIME - startBlockTime) / secondsPerBlock, 1.0) - candidateNumber = min(endBlock.number - 1, startBlock.number + 1) # blocksToJump.uint64) + candidateNumber = min(endBlock.number - 1, startBlock.number + blocksToJump.uint64) candidateBlock = await m.dataProvider.getBlockByNumber(candidateNumber) var candidateAsEth1Block = Eth1Block(number: candidateBlock.number.uint64, @@ -511,151 +453,173 @@ proc findGenesisBlockInRange(m: MainchainMonitor, else: endBlock = candidateAsEth1Block + if endBlock.activeValidatorsCount == 0: + endBlock.activeValidatorsCount = activeValidatorsCountDuringRange + return endBlock proc safeCancel(fut: var Future[void]) = if not fut.isNil and not fut.finished: fut.cancel() - fut = nil + fut = nil -proc stop*(m: MainchainMonitor) = +proc stop*(m: Eth1Monitor) = safeCancel m.runFut - safeCancel m.genesisMonitoringFut -template checkIfShouldStopMainchainMonitor(m: MainchainMonitor) = - if bnStatus == BeaconNodeStatus.Stopping: - if not m.genesisStateFut.isNil: - m.genesisStateFut.complete() - m.genesisStateFut = nil - m.stop - return - -proc checkForGenesisLoop(m: MainchainMonitor) {.async.} = - while true: - m.checkIfShouldStopMainchainMonitor() - - if not m.genesisState.isNil: - return - - try: - # TODO: check for a stale monitor - let - now = epochTime() - (genesisCandidate, genesisParent) = await m.persistFinalizedBlocks(now) - - if genesisCandidate != nil: - # We have a candidate state on our hands, but our current Eth1Chain - # may consist only of blocks that have deposits attached to them - # while the real genesis may have happened in a block without any - # deposits (triggered by MIN_GENESIS_TIME). - # - # This can happen when the beacon node is launched after the genesis - # event. We take a short cut when constructing the initial Eth1Chain - # by downloading only deposit log entries. Thus, we'll see all the - # blocks with deposits, but not the regular blocks in between. - # - # We'll handle this special case below by examing whether we are in - # this potential scenario and we'll use a fast guessing algorith to - # discover the ETh1 block with minimal valid genesis time. - if genesisParent != nil: - if genesisParent.knownValidatorsCount.get >= m.preset.MIN_GENESIS_ACTIVE_VALIDATOR_COUNT and - genesisParent.number - genesisParent.number > 1: - let genesisBlock = await m.findGenesisBlockInRange(genesisParent, genesisCandidate) - if genesisBlock.number != genesisCandidate.number: - m.signalGenesis m.createGenesisState(genesisBlock) - return - - let candidateState = m.createGenesisState(genesisCandidate) - m.signalGenesis candidateState - return - - except CatchableError as err: - debug "Unexpected error in checkForGenesisLoop", err = err.msg - - await sleepAsync(1.seconds) - -proc waitGenesis*(m: MainchainMonitor): Future[BeaconStateRef] {.async.} = +proc waitGenesis*(m: Eth1Monitor): Future[BeaconStateRef] {.async.} = if m.genesisState.isNil: if m.genesisStateFut.isNil: m.genesisStateFut = newFuture[void]("waitGenesis") - m.genesisMonitoringFut = m.checkForGenesisLoop() + info "Waiting genesis state" await m.genesisStateFut m.genesisStateFut = nil - if bnStatus == BeaconNodeStatus.Stopping: - return new BeaconStateRef # cannot return nil... - if m.genesisState != nil: return m.genesisState else: - result = new BeaconStateRef # make the compiler happy - raiseAssert "Unreachable code" + doAssert bnStatus == BeaconNodeStatus.Stopping + return new BeaconStateRef # cannot return nil... -func totalNonFinalizedBlocks(eth1Chain: Eth1Chain): Natural = - # TODO: implement this precisely - eth1Chain.blocks.len +proc syncBlockRange(m: Eth1Monitor, fromBlock, toBlock: Eth1BlockNumber) {.async.} = + var currentBlock = fromBlock + while currentBlock <= toBlock: + var depositLogs: JsonNode = nil -func latestEth1Data(eth1Chain: Eth1Chain): Eth1Data = - if eth1Chain.blocks.len > 0: - eth1Chain.blocks[^1].voteData - else: - eth1Chain.knownStart + var blocksPerRequest = 5000'u64 # This is roughly a day of Eth1 blocks + while true: + let requestToBlock = min(toBlock, currentBlock + blocksPerRequest - 1) -func knownInvalidDepositsCount(eth1Chain: Eth1Chain): uint64 = - for i in countdown(eth1Chain.blocks.len - 1, 0): - let blk = eth1Chain.blocks[i] - if blk.knownValidatorsCount.isSome: - return blk.voteData.deposit_count - blk.knownValidatorsCount.get + debug "Obtaining deposit log events", + fromBlock = currentBlock, + toBlock = requestToBlock + try: + depositLogs = await m.dataProvider.ns.getJsonLogs( + DepositEvent, + fromBlock = some blockId(currentBlock), + toBlock = some blockId(requestToBlock)) + currentBlock = requestToBlock + 1 + break + except CatchableError as err: + blocksPerRequest = blocksPerRequest div 2 + if blocksPerRequest == 0: + raise err - return 0 + let eth1Blocks = depositEventsToBlocks(depositLogs) -func maxValidDeposits(eth1Chain: Eth1Chain): uint64 = - if eth1Chain.blocks.len > 0: - let lastBlock = eth1Chain.blocks[^1] - lastBlock.knownValidatorsCount.get( - lastBlock.voteData.deposit_count - eth1Chain.knownInvalidDepositsCount) - else: - 0 + for i in 0 ..< eth1Blocks.len: + # TODO: The DB operations should be executed as a transaction here + let blk = eth1Blocks[i] -proc processDeposits(m: MainchainMonitor, - dataProvider: Web3DataProviderRef) {.async.} = + for deposit in blk.deposits: + m.db.processDeposit(deposit.data) + + blk.voteData.deposit_count = + m.db.finalizedEth1DepositsMerkleizer.totalChunks + + blk.voteData.deposit_root = mixInLength( + m.db.finalizedEth1DepositsMerkleizer.getFinalHash, + int blk.voteData.deposit_count) + + blk.activeValidatorsCount = m.db.immutableValidatorData.lenu64 + + m.eth1Chain.addBlock blk + + if eth1Blocks.len > 0: + let lastBlock = eth1Blocks[^1] + + when hasDepositRootChecks: + let status = await m.dataProvider.fetchDepositContractData(lastBlock) + debug "Deposit root checks", status, + ourCount = lastBlock.voteData.deposit_count, + ourRoot = lastBlock.voteData.deposit_root + + m.db.putEth1PersistedTo lastBlock.voteData + + notice "Eth1 sync progress", + blockNumber = lastBlock.number, + depositsProcessed = lastBlock.voteData.deposit_count + + if m.genesisStateFut != nil and m.hasEnoughValidators(lastBlock): + await m.dataProvider.fetchTimestamp(lastBlock) + if m.isAfterMinGenesisTime(lastBlock): + var genesisBlockIdx = m.eth1Chain.blocks.len - 1 + for i in 1 ..< eth1Blocks.len: + let idx = (m.eth1Chain.blocks.len - 1) - i + let blk = m.eth1Chain.blocks[idx] + await m.dataProvider.fetchTimestamp(blk) + if m.isGenesisCandidate(blk): + genesisBlockIdx = idx + else: + break + # We have a candidate state on our hands, but our current Eth1Chain + # may consist only of blocks that have deposits attached to them + # while the real genesis may have happened in a block without any + # deposits (triggered by MIN_GENESIS_TIME). + # + # This can happen when the beacon node is launched after the genesis + # event. We take a short cut when constructing the initial Eth1Chain + # by downloading only deposit log entries. Thus, we'll see all the + # blocks with deposits, but not the regular blocks in between. + # + # We'll handle this special case below by examing whether we are in + # this potential scenario and we'll use a fast guessing algorith to + # discover the ETh1 block with minimal valid genesis time. + var genesisBlock = m.eth1Chain.blocks[genesisBlockIdx] + if genesisBlockIdx > 0: + let genesisParent = m.eth1Chain.blocks[genesisBlockIdx - 1] + if genesisParent.timestamp == 0: + await m.dataProvider.fetchTimestamp(genesisParent) + if m.hasEnoughValidators(genesisParent) and + genesisBlock.number - genesisParent.number > 1: + genesisBlock = await m.findGenesisBlockInRange(genesisParent, + genesisBlock) + m.signalGenesis m.createGenesisState(genesisBlock) + +proc handleEth1Progress(m: Eth1Monitor) {.async.} = # ATTENTION! - # Please note that this code is using a queue to guarantee the - # strict serial order of processing of deposits. If we had the - # same code embedded in the deposit contracts events handler, + # Please note that this code is using an async event in order + # to guarantee the strict serial order of processing of deposits. + # If we had the same code embedded in the new block headers event, # it could easily re-order the steps due to the interruptible # interleaved execution of async code. + var eth1SyncedTo = await m.dataProvider.getBlockNumber( + m.eth1Chain.knownStart.block_hash.asBlockHash) + while true: - m.checkIfShouldStopMainchainMonitor() + if bnStatus == BeaconNodeStatus.Stopping: + if not m.genesisStateFut.isNil: + m.genesisStateFut.complete() + m.genesisStateFut = nil + m.stop() + return - let now = epochTime() - discard await m.persistFinalizedBlocks(now) + await m.eth1Progress.wait() + m.eth1Progress.clear() - let blk = await m.depositQueue.popFirst() - m.eth1Chain.trimHeight(Eth1BlockNumber(blk.number) - 1) + if m.latestEth1BlockNumber <= m.preset.ETH1_FOLLOW_DISTANCE: + continue - let latestKnownBlock = if m.eth1Chain.blocks.len > 0: - m.eth1Chain.blocks[^1].number - elif m.eth1Chain.knownStartBlockNum.isSome: - m.eth1Chain.knownStartBlockNum.get - else: - m.eth1Chain.knownStartBlockNum = some( - await dataProvider.getBlockNumber(m.eth1Chain.knownStart.block_hash.asBlockHash)) - m.eth1Chain.knownStartBlockNum.get + let targetBlock = m.latestEth1BlockNumber - m.preset.ETH1_FOLLOW_DISTANCE + if targetBlock <= eth1SyncedTo: + continue - let eth1Blocks = await dataProvider.fetchDepositData(latestKnownBlock + 1, - Eth1BlockNumber blk.number) - for i in 0 ..< eth1Blocks.len: - m.eth1Chain.addBlock eth1Blocks[i] + await m.syncBlockRange(eth1SyncedTo + 1, targetBlock) + eth1SyncedTo = targetBlock -proc isRunning*(m: MainchainMonitor): bool = - not m.runFut.isNil + while m.eth1Chain.blocks.len > 0: + # We'll clean old blocks that can no longer be voting candidates. + # Technically, we should check that the block is outside of the current + # voting period as determined by its timestamp, but we'll approximate + # this by requiring a much larger difference in block numbers. + # (i.e. twice the follow distance). + let earliestBlock = m.eth1Chain.blocks.peekFirst + if earliestBlock.number < targetBlock and + targetBlock - earliestBlock.number < m.preset.ETH1_FOLLOW_DISTANCE * 2: + break + m.eth1Chain.popFirst() -func `===`(json: JsonNode, boolean: bool): bool = - json.kind == JBool and json.bval == boolean - -proc run(m: MainchainMonitor, delayBeforeStart: Duration) {.async.} = +proc run(m: Eth1Monitor, delayBeforeStart: Duration) {.async.} = if delayBeforeStart != ZeroDuration: await sleepAsync(delayBeforeStart) @@ -665,18 +629,18 @@ proc run(m: MainchainMonitor, delayBeforeStart: Duration) {.async.} = await m.dataProvider.onBlockHeaders do (blk: Eth1BlockHeader) {.raises: [Defect], gcsafe.}: try: - m.depositQueue.addLastNoWait(blk) - except AsyncQueueFullError: - raiseAssert "The depositQueue has no size limit" + if blk.number.uint64 > m.latestEth1BlockNumber: + m.latestEth1BlockNumber = blk.number.uint64 + m.eth1Progress.fire() except Exception: # TODO Investigate why this exception is being raised - raiseAssert "queue.addLastNoWait should not raise exceptions" + raiseAssert "AsyncEvent.fire should not raise exceptions" do (err: CatchableError): debug "Error while processing Eth1 block headers subscription", err = err.msg - await m.processDeposits(m.dataProvider) + await m.handleEth1Progress() -proc start(m: MainchainMonitor, delayBeforeStart: Duration) = +proc start(m: Eth1Monitor, delayBeforeStart: Duration) = if m.runFut.isNil: let runFut = m.run(delayBeforeStart) m.runFut = runFut @@ -691,7 +655,7 @@ proc start(m: MainchainMonitor, delayBeforeStart: Duration) = fatal "Fatal exception reached", err = runFut.error.msg quit 1 -proc start*(m: MainchainMonitor) {.inline.} = +proc start*(m: Eth1Monitor) {.inline.} = m.start(0.seconds) proc getEth1BlockHash*(url: string, blockId: RtBlockIdentifier): Future[BlockHash] {.async.} = diff --git a/beacon_chain/merkle_minimal.nim b/beacon_chain/merkle_minimal.nim index a846d34d7..8b88206b1 100644 --- a/beacon_chain/merkle_minimal.nim +++ b/beacon_chain/merkle_minimal.nim @@ -21,7 +21,7 @@ import # TODO All tests need to be moved to the test suite. -const depositContractLimit* = Limit(1'u64 shl (DEPOSIT_CONTRACT_TREE_DEPTH - 1'u64)) +const depositContractLimit* = Limit(1'u64 shl DEPOSIT_CONTRACT_TREE_DEPTH) func attachMerkleProofs*(deposits: var openArray[Deposit]) = let depositsRoots = mapIt(deposits, hash_tree_root(it.data)) @@ -30,6 +30,7 @@ func attachMerkleProofs*(deposits: var openArray[Deposit]) = for i in 0 ..< depositsRoots.len: incrementalMerkleProofs.addChunkAndGenMerkleProof(depositsRoots[i], deposits[i].proof) + deposits[i].proof[32] = default(Eth2Digest) deposits[i].proof[32].data[0..7] = toBytesLE uint64(i + 1) template getProof*(proofs: seq[Eth2Digest], idxParam: int): openArray[Eth2Digest] = diff --git a/beacon_chain/ssz/merkleization.nim b/beacon_chain/ssz/merkleization.nim index b9f471686..94e1d0f75 100644 --- a/beacon_chain/ssz/merkleization.nim +++ b/beacon_chain/ssz/merkleization.nim @@ -28,13 +28,20 @@ const zero64 = default array[64, byte] bitsPerChunk = bytesPerChunk * 8 +func binaryTreeHeight*(totalElements: Limit): int = + bitWidth nextPow2(uint64 totalElements) + type - SszChunksMerkleizer* {.requiresInit.} = object + SszMerkleizerImpl = object combinedChunks: ptr UncheckedArray[Eth2Digest] totalChunks: uint64 topIndex: int -template chunks*(m: SszChunksMerkleizer): openArray[Eth2Digest] = + SszMerkleizer*[limit: static[Limit]] = object + combinedChunks: ref array[binaryTreeHeight limit, Eth2Digest] + impl: SszMerkleizerImpl + +template chunks*(m: SszMerkleizerImpl): openArray[Eth2Digest] = m.combinedChunks.toOpenArray(0, m.topIndex) func digest(a, b: openArray[byte]): Eth2Digest = @@ -77,7 +84,7 @@ func computeZeroHashes: array[sizeof(Limit) * 8, Eth2Digest] = const zeroHashes* = computeZeroHashes() -func addChunk*(merkleizer: var SszChunksMerkleizer, data: openArray[byte]) = +func addChunk*(merkleizer: var SszMerkleizerImpl, data: openArray[byte]) = doAssert data.len > 0 and data.len <= bytesPerChunk if getBitLE(merkleizer.totalChunks, 0): @@ -107,7 +114,7 @@ func addChunk*(merkleizer: var SszChunksMerkleizer, data: openArray[byte]) = template isOdd(x: SomeNumber): bool = (x and 1) != 0 -func addChunkAndGenMerkleProof*(merkleizer: var SszChunksMerkleizer, +func addChunkAndGenMerkleProof*(merkleizer: var SszMerkleizerImpl, hash: Eth2Digest, outProof: var openArray[Eth2Digest]) = var @@ -129,7 +136,7 @@ func addChunkAndGenMerkleProof*(merkleizer: var SszChunksMerkleizer, merkleizer.totalChunks += 1 -func completeStartedChunk(merkleizer: var SszChunksMerkleizer, +func completeStartedChunk(merkleizer: var SszMerkleizerImpl, hash: Eth2Digest, atLevel: int) = when false: let @@ -145,7 +152,7 @@ func completeStartedChunk(merkleizer: var SszChunksMerkleizer, merkleizer.combinedChunks[i] = hash break -func addChunksAndGenMerkleProofs*(merkleizer: var SszChunksMerkleizer, +func addChunksAndGenMerkleProofs*(merkleizer: var SszMerkleizerImpl, chunks: openArray[Eth2Digest]): seq[Eth2Digest] = doAssert chunks.len > 0 and merkleizer.topIndex > 0 @@ -283,17 +290,9 @@ func addChunksAndGenMerkleProofs*(merkleizer: var SszChunksMerkleizer, merkleizer.totalChunks = newTotalChunks -func binaryTreeHeight*(totalElements: Limit): int = - bitWidth nextPow2(uint64 totalElements) - -type - SszMerkleizer*[limit: static[Limit]] = object - combinedChunks: ref array[binaryTreeHeight limit, Eth2Digest] - m: SszChunksMerkleizer - proc init*(S: type SszMerkleizer): S = new result.combinedChunks - result.m = SszChunksMerkleizer( + result.impl = SszMerkleizerImpl( combinedChunks: cast[ptr UncheckedArray[Eth2Digest]]( addr result.combinedChunks[][0]), topIndex: binaryTreeHeight(result.limit) - 1, @@ -304,7 +303,7 @@ proc init*(S: type SszMerkleizer, totalChunks: uint64): S = new result.combinedChunks result.combinedChunks[][0 ..< combinedChunks.len] = combinedChunks - result.m = SszChunksMerkleizer( + result.impl = SszMerkleizerImpl( combinedChunks: cast[ptr UncheckedArray[Eth2Digest]]( addr result.combinedChunks[][0]), topIndex: binaryTreeHeight(result.limit) - 1, @@ -313,7 +312,7 @@ proc init*(S: type SszMerkleizer, proc clone*[L: static[Limit]](cloned: SszMerkleizer[L]): SszMerkleizer[L] = new result.combinedChunks result.combinedChunks[] = cloned.combinedChunks[] - result.m = SszChunksMerkleizer( + result.impl = SszMerkleizerImpl( combinedChunks: cast[ptr UncheckedArray[Eth2Digest]]( addr result.combinedChunks[][0]), topIndex: binaryTreeHeight(L) - 1, @@ -322,29 +321,29 @@ proc clone*[L: static[Limit]](cloned: SszMerkleizer[L]): SszMerkleizer[L] = template addChunksAndGenMerkleProofs*( merkleizer: var SszMerkleizer, chunks: openArray[Eth2Digest]): seq[Eth2Digest] = - addChunksAndGenMerkleProofs(merkleizer.m, chunks) + addChunksAndGenMerkleProofs(merkleizer.impl, chunks) template addChunk*(merkleizer: var SszMerkleizer, data: openArray[byte]) = - addChunk(merkleizer.m, data) + addChunk(merkleizer.impl, data) template totalChunks*(merkleizer: SszMerkleizer): uint64 = - merkleizer.m.totalChunks + merkleizer.impl.totalChunks template getFinalHash*(merkleizer: SszMerkleizer): Eth2Digest = - merkleizer.m.getFinalHash + merkleizer.impl.getFinalHash -template createMerkleizer*(totalElements: static Limit): SszChunksMerkleizer = +template createMerkleizer*(totalElements: static Limit): SszMerkleizerImpl = trs "CREATING A MERKLEIZER FOR ", totalElements const treeHeight = binaryTreeHeight totalElements var combinedChunks {.noInit.}: array[treeHeight, Eth2Digest] - SszChunksMerkleizer( + SszMerkleizerImpl( combinedChunks: cast[ptr UncheckedArray[Eth2Digest]](addr combinedChunks), topIndex: treeHeight - 1, totalChunks: 0) -func getFinalHash*(merkleizer: SszChunksMerkleizer): Eth2Digest = +func getFinalHash*(merkleizer: SszMerkleizerImpl): Eth2Digest = if merkleizer.totalChunks == 0: return zeroHashes[merkleizer.topIndex] @@ -383,7 +382,7 @@ func getFinalHash*(merkleizer: SszChunksMerkleizer): Eth2Digest = for i in bottomHashIdx + 1 ..< topHashIdx: result = mergeBranches(result, zeroHashes[i]) -func mixInLength(root: Eth2Digest, length: int): Eth2Digest = +func mixInLength*(root: Eth2Digest, length: int): Eth2Digest = var dataLen: array[32, byte] dataLen[0..<8] = uint64(length).toBytesLE() mergeBranches(root, dataLen) @@ -408,7 +407,7 @@ template writeBytesLE(chunk: var array[bytesPerChunk, byte], atParam: int, let at = atParam chunk[at ..< at + sizeof(val)] = toBytesLE(val) -func chunkedHashTreeRootForBasicTypes[T](merkleizer: var SszChunksMerkleizer, +func chunkedHashTreeRootForBasicTypes[T](merkleizer: var SszMerkleizerImpl, arr: openArray[T]): Eth2Digest = static: doAssert T is BasicType @@ -460,7 +459,7 @@ func chunkedHashTreeRootForBasicTypes[T](merkleizer: var SszChunksMerkleizer, getFinalHash(merkleizer) -func bitListHashTreeRoot(merkleizer: var SszChunksMerkleizer, x: BitSeq): Eth2Digest = +func bitListHashTreeRoot(merkleizer: var SszMerkleizerImpl, x: BitSeq): Eth2Digest = # TODO: Switch to a simpler BitList representation and # replace this with `chunkedHashTreeRoot` trs "CHUNKIFYING BIT SEQ WITH TOP INDEX ", merkleizer.topIndex diff --git a/beacon_chain/validator_duties.nim b/beacon_chain/validator_duties.nim index 8c8cf4af6..3ad6142da 100644 --- a/beacon_chain/validator_duties.nim +++ b/beacon_chain/validator_duties.nim @@ -185,11 +185,11 @@ proc makeBeaconBlockForHeadAndSlot*(node: BeaconNode, # Advance state to the slot that we're proposing for node.chainDag.withState(node.chainDag.tmpState, head.atSlot(slot)): let (eth1data, deposits) = - if node.mainchainMonitor.isNil: + if node.eth1Monitor.isNil: (state.eth1_data, newSeq[Deposit]()) else: let finalizedEth1Data = node.chainDag.getFinalizedEpochRef().eth1_data - node.mainchainMonitor.getBlockProposalData(state, finalizedEth1Data) + node.eth1Monitor.getBlockProposalData(state, finalizedEth1Data) let poolPtr = unsafeAddr node.chainDag # safe because restore is short-lived diff --git a/tests/test_ssz_merkleization.nim b/tests/test_ssz_merkleization.nim index 33ce175f5..d38fba32f 100644 --- a/tests/test_ssz_merkleization.nim +++ b/tests/test_ssz_merkleization.nim @@ -227,6 +227,7 @@ func attachMerkleProofsReferenceImpl(deposits: var openArray[Deposit]) = for val_idx in 0 ..< deposits.len: deposits[val_idx].proof[0..31] = merkle_tree.getMerkleProof(val_idx, true) + deposits[val_idx].proof[32] = default(Eth2Digest) deposits[val_idx].proof[32].data[0..7] = uint_to_bytes8((val_idx + 1).uint64) doAssert is_valid_merkle_branch(