Further reduce the number of RPC requests; Significantly faster ETH1 syncing

This commit is contained in:
Zahary Karadjov 2020-11-03 03:21:07 +02:00 committed by zah
parent e414d964f0
commit 2f29e3e7f3
9 changed files with 293 additions and 333 deletions

View File

@ -98,11 +98,6 @@ OK: 2/2 Fail: 0/2 Skip: 0/2
+ Mocked start private key OK
```
OK: 3/3 Fail: 0/3 Skip: 0/3
## Mocking utilities
```diff
+ merkle_minimal OK
```
OK: 1/1 Fail: 0/1 Skip: 0/1
## Official - constants & config [Preset: mainnet]
```diff
+ BASE_REWARD_FACTOR 64 [Preset: mainnet] OK
@ -272,5 +267,4 @@ OK: 1/1 Fail: 0/1 Skip: 0/1
OK: 1/1 Fail: 0/1 Skip: 0/1
---TOTAL---
OK: 145/154 Fail: 0/154 Skip: 9/154
OK: 144/153 Fail: 0/153 Skip: 9/153

View File

@ -9,7 +9,7 @@
import
# Standard libraries
std/[algorithm, deques, sequtils, sets, tables, options],
std/[deques, sequtils, sets, tables, options],
# Status libraries
chronicles, stew/[byteutils], json_serialization/std/sets as jsonSets,
# Internal
@ -27,9 +27,10 @@ proc init*(T: type AttestationPool, chainDag: ChainDAGRef, quarantine: Quarantin
## Initialize an AttestationPool from the chainDag `headState`
## The `finalized_root` works around the finalized_checkpoint of the genesis block
## holding a zero_root.
let finalizedEpochRef = chainDag.getFinalizedEpochRef()
var forkChoice = ForkChoice.init(
chainDag.getFinalizedEpochRef(),
finalizedEpochRef,
chainDag.finalizedHead.blck)
# Feed fork choice with unfinalized history - during startup, block pool only

View File

@ -66,20 +66,20 @@ func enrForkIdFromState(state: BeaconState): ENRForkID =
next_fork_version: forkVer,
next_fork_epoch: FAR_FUTURE_EPOCH)
proc startMainchainMonitor(db: BeaconChainDB,
conf: BeaconNodeConf): Future[MainchainMonitor] {.async.} =
let mainchainMonitorRes = await MainchainMonitor.init(
proc startEth1Monitor(db: BeaconChainDB,
conf: BeaconNodeConf): Future[Eth1Monitor] {.async.} =
let eth1MonitorRes = await Eth1Monitor.init(
db,
conf.runtimePreset,
conf.web3Url,
conf.depositContractAddress.get,
conf.depositContractDeployedAt.get)
result = if mainchainMonitorRes.isOk:
mainchainMonitorRes.get
result = if eth1MonitorRes.isOk:
eth1MonitorRes.get
else:
fatal "Failed to start Eth1 monitor",
reason = mainchainMonitorRes.error,
reason = eth1MonitorRes.error,
web3Url = conf.web3Url,
depositContractAddress = conf.depositContractAddress.get,
depositContractDeployedAt = conf.depositContractDeployedAt.get
@ -98,7 +98,7 @@ proc init*(T: type BeaconNode,
db = BeaconChainDB.init(conf.runtimePreset, conf.databaseDir)
var
mainchainMonitor: MainchainMonitor
eth1Monitor: Eth1Monitor
genesisState, checkpointState: ref BeaconState
checkpointBlock: SignedBeaconBlock
@ -159,9 +159,9 @@ proc init*(T: type BeaconNode,
# TODO Could move this to a separate "GenesisMonitor" process or task
# that would do only this - see Paul's proposal for this.
mainchainMonitor = await startMainchainMonitor(db, conf)
eth1Monitor = await startEth1Monitor(db, conf)
genesisState = await mainchainMonitor.waitGenesis()
genesisState = await eth1Monitor.waitGenesis()
if bnStatus == BeaconNodeStatus.Stopping:
return nil
@ -227,13 +227,13 @@ proc init*(T: type BeaconNode,
if checkpointState != nil:
chainDag.setTailState(checkpointState[], checkpointBlock)
if mainchainMonitor.isNil and
if eth1Monitor.isNil and
conf.web3Url.len > 0 and
conf.depositContractAddress.isSome and
conf.depositContractDeployedAt.isSome:
# TODO if we don't have any validators attached,
# we don't need a mainchain monitor
mainchainMonitor = await startMainchainMonitor(db, conf)
eth1Monitor = await startEth1Monitor(db, conf)
let rpcServer = if conf.rpcEnabled:
RpcServer.init(conf.rpcAddress, conf.rpcPort)
@ -259,7 +259,7 @@ proc init*(T: type BeaconNode,
quarantine: quarantine,
attestationPool: attestationPool,
exitPool: exitPool,
mainchainMonitor: mainchainMonitor,
eth1Monitor: eth1Monitor,
beaconClock: beaconClock,
rpcServer: rpcServer,
forkDigest: enrForkId.forkDigest,

View File

@ -42,7 +42,7 @@ type
quarantine*: QuarantineRef
attestationPool*: ref AttestationPool
exitPool*: ref ExitPool
mainchainMonitor*: MainchainMonitor
eth1Monitor*: Eth1Monitor
beaconClock*: BeaconClock
rpcServer*: RpcServer
vcProcess*: Process

View File

@ -5,8 +5,6 @@ import
spec/[datatypes, digest, crypto, beaconstate, helpers],
ssz, beacon_chain_db, network_metadata, merkle_minimal, beacon_node_status
from times import epochTime
export
web3Types
@ -40,26 +38,26 @@ type
timestamp*: Eth1BlockTimestamp
deposits*: seq[Deposit]
voteData*: Eth1Data
knownValidatorsCount*: Option[uint64]
activeValidatorsCount*: uint64
Eth1Chain* = object
knownStart: Eth1Data
knownStartBlockNum: Option[Eth1BlockNumber]
blocks: Deque[Eth1Block]
blocksByHash: Table[BlockHash, Eth1Block]
MainchainMonitor* = ref object
Eth1Monitor* = ref object
db: BeaconChainDB
preset: RuntimePreset
dataProvider: Web3DataProviderRef
depositQueue: AsyncQueue[Eth1BlockHeader]
latestEth1BlockNumber: Eth1BlockNumber
eth1Progress: AsyncEvent
eth1Chain: Eth1Chain
genesisState: NilableBeaconStateRef
genesisStateFut: Future[void]
genesisMonitoringFut: Future[void]
runFut: Future[void]
@ -84,11 +82,12 @@ type
const
web3Timeouts = 5.seconds
hasDepositRootChecks = defined(with_deposit_root_checks)
template depositContractAddress(m: MainchainMonitor): Eth1Address =
template depositContractAddress(m: Eth1Monitor): Eth1Address =
m.dataProvider.ns.contractAddress
template web3Url(m: MainchainMonitor): string =
template web3Url(m: Eth1Monitor): string =
m.dataProvider.url
# TODO: Add preset validation
@ -126,61 +125,41 @@ template findBlock*(eth1Chain: Eth1Chain, hash: BlockHash): Eth1Block =
template findBlock*(eth1Chain: Eth1Chain, eth1Data: Eth1Data): Eth1Block =
getOrDefault(eth1Chain.blocksByHash, asBlockHash(eth1Data.block_hash), nil)
proc findParent*(eth1Chain: Eth1Chain, blk: BlockObject): Eth1Block =
result = eth1Chain.findBlock(blk.parentHash)
# a distinct type is stipped here:
let blockNumber = Eth1BlockNumber(blk.number)
if result != nil and result.number != blockNumber - 1:
debug "Found inconsistent numbering of Eth1 blocks. Ignoring block.",
blockHash = blk.hash.toHex, blockNumber,
parentHash = blk.parentHash.toHex, parentNumber = result.number
result = nil
func latestCandidateBlock(eth1Chain: Eth1Chain, preset: RuntimePreset, periodStart: uint64): Eth1Block =
func latestCandidateBlock(eth1Chain: Eth1Chain,
preset: RuntimePreset,
periodStart: uint64): Eth1Block =
for i in countdown(eth1Chain.blocks.len - 1, 0):
let blk = eth1Chain.blocks[i]
if is_candidate_block(preset, blk, periodStart):
return blk
func popBlock(eth1Chain: var Eth1Chain) =
let removed = eth1Chain.blocks.popLast
func popFirst(eth1Chain: var Eth1Chain) =
let removed = eth1Chain.blocks.popFirst
eth1Chain.blocksByHash.del removed.voteData.block_hash.asBlockHash
func trimHeight(eth1Chain: var Eth1Chain, blockNumber: Eth1BlockNumber) =
## Removes all blocks above certain `blockNumber`
while eth1Chain.blocks.len > 0:
if eth1Chain.blocks.peekLast.number > blockNumber:
eth1Chain.popBlock()
else:
break
func addBlock*(eth1Chain: var Eth1Chain, newBlock: Eth1Block) =
func addBlock(eth1Chain: var Eth1Chain, newBlock: Eth1Block) =
eth1Chain.blocks.addLast newBlock
eth1Chain.blocksByHash[newBlock.voteData.block_hash.asBlockHash] = newBlock
proc allDepositsUpTo*(m: MainchainMonitor, totalDeposits: uint64): seq[Deposit] =
for i in 0'u64 ..< totalDeposits:
result.add Deposit(data: m.db.deposits.get(i))
func clear*(eth1Chain: var Eth1Chain) =
eth1Chain = default(Eth1Chain)
template hash*(x: Eth1Block): Hash =
hash(x.voteData.block_hash.data)
proc close*(p: Web3DataProviderRef): Future[void] {.async, locks: 0.} =
proc close*(p: Web3DataProviderRef): Future[void] {.async.} =
if p.blockHeadersSubscription != nil:
await p.blockHeadersSubscription.unsubscribe()
await p.web3.close()
proc getBlockByHash*(p: Web3DataProviderRef, hash: BlockHash): Future[BlockObject] =
proc getBlockByHash*(p: Web3DataProviderRef, hash: BlockHash):
Future[BlockObject] =
return p.web3.provider.eth_getBlockByHash(hash, false)
proc getBlockByNumber*(p: Web3DataProviderRef, number: Eth1BlockNumber): Future[BlockObject] =
proc getBlockByNumber*(p: Web3DataProviderRef,
number: Eth1BlockNumber): Future[BlockObject] =
return p.web3.provider.eth_getBlockByNumber(&"0x{number:X}", false)
proc getBlockNumber(p: Web3DataProviderRef, hash: BlockHash): Future[Eth1BlockNumber] {.async.} =
proc getBlockNumber(p: Web3DataProviderRef, hash: BlockHash):
Future[Eth1BlockNumber] {.async.} =
try:
let blk = awaitWithTimeout(p.getBlockByHash(hash), web3Timeouts):
return 0
@ -190,14 +169,12 @@ proc getBlockNumber(p: Web3DataProviderRef, hash: BlockHash): Future[Eth1BlockNu
hash = $hash, err = exc.msg
raise exc
template readJsonField(j: JsonNode,
fieldName: string,
ValueType: type): untyped =
template readJsonField(j: JsonNode, fieldName: string, ValueType: type): untyped =
var res: ValueType
fromJson(j[fieldName], fieldName, res)
res
proc readJsonDeposits(depositsList: JsonNode): seq[Eth1Block] =
proc depositEventsToBlocks(depositsList: JsonNode): seq[Eth1Block] =
if depositsList.kind != JArray:
raise newException(CatchableError,
"Web3 provider didn't return a list of deposit events")
@ -238,39 +215,56 @@ proc readJsonDeposits(depositsList: JsonNode): seq[Eth1Block] =
amount: bytes_to_uint64(array[8, byte](amount)),
signature: ValidatorSig.init(array[96, byte](signature))))
proc fetchDepositData*(p: Web3DataProviderRef,
fromBlock, toBlock: Eth1BlockNumber): Future[seq[Eth1Block]]
{.async, locks: 0.} =
var currentBlock = fromBlock
while currentBlock <= toBlock:
var blocksPerRequest = 128'u64
while true:
let requestToBlock = min(toBlock, currentBlock + blocksPerRequest - 1)
proc fetchTimestamp(p: Web3DataProviderRef, blk: Eth1Block) {.async.} =
let web3block = await p.getBlockByHash(blk.voteData.block_hash.asBlockHash)
blk.timestamp = Eth1BlockTimestamp web3block.timestamp
debug "Obtaining deposit log events",
fromBlock = currentBlock,
toBlock = requestToBlock
when hasDepositRootChecks:
type
DepositContractDataStatus = enum
Fetched
VerifiedCorrect
DepositRootIncorrect
DepositRootUnavailable
DepositCountIncorrect
DepositCountUnavailable
let depositLogs = try:
await p.ns.getJsonLogs(
DepositEvent,
fromBlock = some blockId(currentBlock),
toBlock = some blockId(requestToBlock))
proc fetchDepositContractData(p: Web3DataProviderRef, blk: Eth1Block):
Future[DepositContractDataStatus] {.async.} =
let
depositRoot = p.ns.get_deposit_root.call(blockNumber = blk.number)
rawCount = p.ns.get_deposit_count.call(blockNumber = blk.number)
try:
let fetchedRoot = asEth2Digest(await depositRoot)
if blk.voteData.deposit_root == default(Eth2Digest):
blk.voteData.deposit_root = fetchedRoot
result = Fetched
elif blk.voteData.deposit_root == fetchedRoot:
result = VerifiedCorrect
else:
result = DepositRootIncorrect
except CatchableError as err:
blocksPerRequest = blocksPerRequest div 2
if blocksPerRequest > 0:
continue
raise err
debug "Failed to fetch deposits root",
blockNumber = blk.number,
err = err.msg
result = DepositRootUnavailable
currentBlock = requestToBlock + 1
result.add readJsonDeposits(depositLogs)
break # breaks the inner "retry" loop and continues
# to the next range of blocks to request
try:
let fetchedCount = bytes_to_uint64(array[8, byte](await rawCount))
if blk.voteData.deposit_count == 0:
blk.voteData.deposit_count = fetchedCount
elif blk.voteData.deposit_count != fetchedCount:
result = DepositCountIncorrect
except CatchableError as err:
debug "Failed to fetch deposits count",
blockNumber = blk.number,
err = err.msg
result = DepositCountUnavailable
proc onBlockHeaders*(p: Web3DataProviderRef,
blockHeaderHandler: BlockHeaderHandler,
errorHandler: SubscriptionErrorHandler): Future[void]
{.async, gcsafe.} =
errorHandler: SubscriptionErrorHandler) {.async.} =
if p.blockHeadersSubscription != nil:
await p.blockHeadersSubscription.unsubscribe()
@ -280,7 +274,7 @@ proc onBlockHeaders*(p: Web3DataProviderRef,
blockHeaderHandler, errorHandler)
# https://github.com/ethereum/eth2.0-specs/blob/v1.0.0-rc.0/specs/phase0/validator.md#get_eth1_data
proc getBlockProposalData*(m: MainchainMonitor,
proc getBlockProposalData*(m: Eth1Monitor,
state: BeaconState,
finalizedEth1Data: Eth1Data): (Eth1Data, seq[Deposit]) =
# TODO To make block proposal cheaper, we can perform this action more regularly
@ -339,7 +333,7 @@ proc getBlockProposalData*(m: MainchainMonitor,
swap(result[1], deposits)
proc init*(T: type MainchainMonitor,
proc init*(T: type Eth1Monitor,
db: BeaconChainDB,
preset: RuntimePreset,
web3Url: string,
@ -375,114 +369,62 @@ proc init*(T: type MainchainMonitor,
db: db,
preset: preset,
dataProvider: dataProvider,
depositQueue: newAsyncQueue[Eth1BlockHeader](),
eth1Progress: newAsyncEvent(),
eth1Chain: Eth1Chain(knownStart: knownStart))
proc fetchBlockDetails(p: Web3DataProviderRef, blk: Eth1Block) {.async.} =
let
web3Block = p.getBlockByNumber(blk.number)
depositRoot = p.ns.get_deposit_root.call(blockNumber = blk.number)
rawCount = p.ns.get_deposit_count.call(blockNumber = blk.number)
proc allDepositsUpTo(m: Eth1Monitor, totalDeposits: uint64): seq[Deposit] =
for i in 0'u64 ..< totalDeposits:
result.add Deposit(data: m.db.deposits.get(i))
var error: ref CatchableError = nil
try: blk.voteData.deposit_root = asEth2Digest(await depositRoot)
except CatchableError as err: error = err
try: blk.voteData.deposit_count = bytes_to_uint64(array[8, byte](await rawCount))
except CatchableError as err: error = err
if error != nil:
debug "Deposit contract data not available",
blockNumber = blk.number,
err = error.msg
blk.timestamp = Eth1BlockTimestamp (await web3Block).timestamp
proc persistFinalizedBlocks(m: MainchainMonitor, timeNow: float):
Future[tuple[genesisBlock: Eth1Block, previousBlock: Eth1Block]] {.async.} =
let followDistanceInSeconds = uint64(SECONDS_PER_ETH1_BLOCK) *
m.preset.ETH1_FOLLOW_DISTANCE
var prevBlock: Eth1Block
# TODO: The DB operations should be executed as a transaction here
block: # TODO Begin Transaction
while m.eth1Chain.blocks.len > 0:
# TODO keep a separate set of candidate blocks
var blk = m.eth1Chain.blocks.peekFirst
if blk.timestamp != 0:
await fetchBlockDetails(m.dataProvider, blk)
if float(blk.timestamp + followDistanceInSeconds) > timeNow:
break
for deposit in blk.deposits:
m.db.processDeposit(deposit.data)
# TODO compare our results against the web3 provider
blk.voteData.deposit_count = m.db.finalizedEth1DepositsMerkleizer.totalChunks
blk.voteData.deposit_root = m.db.finalizedEth1DepositsMerkleizer.getFinalHash
# TODO
# Try to confirm history by obtaining deposit_root from a more
# recent block
# TODO The len property is currently stored in memory which
# makes it unsafe in the face of failed transactions
let activeValidatorsCount = m.db.immutableValidatorData.lenu64
blk.knownValidatorsCount = some activeValidatorsCount
discard m.eth1Chain.blocks.popFirst()
m.eth1Chain.blocksByHash.del blk.voteData.block_hash.asBlockHash
let blockGenesisTime = genesis_time_from_eth1_timestamp(m.preset,
blk.timestamp)
if blockGenesisTime >= m.preset.MIN_GENESIS_TIME and
activeValidatorsCount >= m.preset.MIN_GENESIS_ACTIVE_VALIDATOR_COUNT:
result = (blk, prevBlock)
prevBlock = blk
if prevBlock != nil:
# TODO commit transaction
m.db.putEth1PersistedTo prevBlock.voteData
m.eth1Chain.knownStart = prevBlock.voteData
notice "Eth1 sync progress",
blockNumber = prevBlock.number,
depositsProcessed = prevBlock.voteData.deposit_count
# TODO Commit
proc createGenesisState(m: MainchainMonitor, eth1Block: Eth1Block): BeaconStateRef =
proc createGenesisState(m: Eth1Monitor, eth1Block: Eth1Block): BeaconStateRef =
notice "Generating genesis state",
blockNum = eth1Block.number,
blockHash = eth1Block.voteData.block_hash,
blockTimestamp = eth1Block.timestamp,
totalDeposits = eth1Block.voteData.deposit_count,
activeValidators = eth1Block.knownValidatorsCount.get
activeValidators = eth1Block.activeValidatorsCount
var deposits = m.allDepositsUpTo(eth1Block.voteData.deposit_count)
attachMerkleProofs deposits
initialize_beacon_state_from_eth1(m.preset,
result = initialize_beacon_state_from_eth1(
m.preset,
eth1Block.voteData.block_hash,
eth1Block.timestamp.uint64,
deposits, {})
proc signalGenesis(m: MainchainMonitor, genesisState: BeaconStateRef) =
doAssert result.validators.lenu64 == eth1Block.activeValidatorsCount
proc signalGenesis(m: Eth1Monitor, genesisState: BeaconStateRef) =
m.genesisState = genesisState
if not m.genesisStateFut.isNil:
m.genesisStateFut.complete()
m.genesisStateFut = nil
proc findGenesisBlockInRange(m: MainchainMonitor,
startBlock, endBlock: Eth1Block): Future[Eth1Block]
{.async.} =
template hasEnoughValidators(m: Eth1Monitor, blk: Eth1Block): bool =
blk.activeValidatorsCount >= m.preset.MIN_GENESIS_ACTIVE_VALIDATOR_COUNT
func isAfterMinGenesisTime(m: Eth1Monitor, blk: Eth1Block): bool =
doAssert blk.timestamp != 0
let t = genesis_time_from_eth1_timestamp(m.preset, uint64 blk.timestamp)
t >= m.preset.MIN_GENESIS_TIME
func isGenesisCandidate(m: Eth1Monitor, blk: Eth1Block): bool =
m.hasEnoughValidators(blk) and m.isAfterMinGenesisTime(blk)
proc findGenesisBlockInRange(m: Eth1Monitor, startBlock, endBlock: Eth1Block):
Future[Eth1Block] {.async.} =
doAssert startBlock.timestamp != 0 and not m.isAfterMinGenesisTime(startBlock)
doAssert endBlock.timestamp != 0 and m.isAfterMinGenesisTime(endBlock)
doAssert m.hasEnoughValidators(startBlock)
doAssert m.hasEnoughValidators(endBlock)
var
startBlock = startBlock
endBlock = endBlock
depositData = startBlock.voteData
activeValidatorsCountDuringRange = startBlock.activeValidatorsCount
while startBlock.number + 1 < endBlock.number:
let
@ -491,7 +433,7 @@ proc findGenesisBlockInRange(m: MainchainMonitor,
secondsPerBlock = float(endBlock.timestamp - startBlock.timestamp) /
float(endBlock.number - startBlock.number)
blocksToJump = max(float(MIN_GENESIS_TIME - startBlockTime) / secondsPerBlock, 1.0)
candidateNumber = min(endBlock.number - 1, startBlock.number + 1) # blocksToJump.uint64)
candidateNumber = min(endBlock.number - 1, startBlock.number + blocksToJump.uint64)
candidateBlock = await m.dataProvider.getBlockByNumber(candidateNumber)
var candidateAsEth1Block = Eth1Block(number: candidateBlock.number.uint64,
@ -511,6 +453,9 @@ proc findGenesisBlockInRange(m: MainchainMonitor,
else:
endBlock = candidateAsEth1Block
if endBlock.activeValidatorsCount == 0:
endBlock.activeValidatorsCount = activeValidatorsCountDuringRange
return endBlock
proc safeCancel(fut: var Future[void]) =
@ -518,32 +463,95 @@ proc safeCancel(fut: var Future[void]) =
fut.cancel()
fut = nil
proc stop*(m: MainchainMonitor) =
proc stop*(m: Eth1Monitor) =
safeCancel m.runFut
safeCancel m.genesisMonitoringFut
template checkIfShouldStopMainchainMonitor(m: MainchainMonitor) =
if bnStatus == BeaconNodeStatus.Stopping:
if not m.genesisStateFut.isNil:
m.genesisStateFut.complete()
proc waitGenesis*(m: Eth1Monitor): Future[BeaconStateRef] {.async.} =
if m.genesisState.isNil:
if m.genesisStateFut.isNil:
m.genesisStateFut = newFuture[void]("waitGenesis")
info "Waiting genesis state"
await m.genesisStateFut
m.genesisStateFut = nil
m.stop
return
proc checkForGenesisLoop(m: MainchainMonitor) {.async.} =
if m.genesisState != nil:
return m.genesisState
else:
doAssert bnStatus == BeaconNodeStatus.Stopping
return new BeaconStateRef # cannot return nil...
proc syncBlockRange(m: Eth1Monitor, fromBlock, toBlock: Eth1BlockNumber) {.async.} =
var currentBlock = fromBlock
while currentBlock <= toBlock:
var depositLogs: JsonNode = nil
var blocksPerRequest = 5000'u64 # This is roughly a day of Eth1 blocks
while true:
m.checkIfShouldStopMainchainMonitor()
if not m.genesisState.isNil:
return
let requestToBlock = min(toBlock, currentBlock + blocksPerRequest - 1)
debug "Obtaining deposit log events",
fromBlock = currentBlock,
toBlock = requestToBlock
try:
# TODO: check for a stale monitor
let
now = epochTime()
(genesisCandidate, genesisParent) = await m.persistFinalizedBlocks(now)
depositLogs = await m.dataProvider.ns.getJsonLogs(
DepositEvent,
fromBlock = some blockId(currentBlock),
toBlock = some blockId(requestToBlock))
currentBlock = requestToBlock + 1
break
except CatchableError as err:
blocksPerRequest = blocksPerRequest div 2
if blocksPerRequest == 0:
raise err
if genesisCandidate != nil:
let eth1Blocks = depositEventsToBlocks(depositLogs)
for i in 0 ..< eth1Blocks.len:
# TODO: The DB operations should be executed as a transaction here
let blk = eth1Blocks[i]
for deposit in blk.deposits:
m.db.processDeposit(deposit.data)
blk.voteData.deposit_count =
m.db.finalizedEth1DepositsMerkleizer.totalChunks
blk.voteData.deposit_root = mixInLength(
m.db.finalizedEth1DepositsMerkleizer.getFinalHash,
int blk.voteData.deposit_count)
blk.activeValidatorsCount = m.db.immutableValidatorData.lenu64
m.eth1Chain.addBlock blk
if eth1Blocks.len > 0:
let lastBlock = eth1Blocks[^1]
when hasDepositRootChecks:
let status = await m.dataProvider.fetchDepositContractData(lastBlock)
debug "Deposit root checks", status,
ourCount = lastBlock.voteData.deposit_count,
ourRoot = lastBlock.voteData.deposit_root
m.db.putEth1PersistedTo lastBlock.voteData
notice "Eth1 sync progress",
blockNumber = lastBlock.number,
depositsProcessed = lastBlock.voteData.deposit_count
if m.genesisStateFut != nil and m.hasEnoughValidators(lastBlock):
await m.dataProvider.fetchTimestamp(lastBlock)
if m.isAfterMinGenesisTime(lastBlock):
var genesisBlockIdx = m.eth1Chain.blocks.len - 1
for i in 1 ..< eth1Blocks.len:
let idx = (m.eth1Chain.blocks.len - 1) - i
let blk = m.eth1Chain.blocks[idx]
await m.dataProvider.fetchTimestamp(blk)
if m.isGenesisCandidate(blk):
genesisBlockIdx = idx
else:
break
# We have a candidate state on our hands, but our current Eth1Chain
# may consist only of blocks that have deposits attached to them
# while the real genesis may have happened in a block without any
@ -557,105 +565,61 @@ proc checkForGenesisLoop(m: MainchainMonitor) {.async.} =
# We'll handle this special case below by examing whether we are in
# this potential scenario and we'll use a fast guessing algorith to
# discover the ETh1 block with minimal valid genesis time.
if genesisParent != nil:
if genesisParent.knownValidatorsCount.get >= m.preset.MIN_GENESIS_ACTIVE_VALIDATOR_COUNT and
genesisParent.number - genesisParent.number > 1:
let genesisBlock = await m.findGenesisBlockInRange(genesisParent, genesisCandidate)
if genesisBlock.number != genesisCandidate.number:
var genesisBlock = m.eth1Chain.blocks[genesisBlockIdx]
if genesisBlockIdx > 0:
let genesisParent = m.eth1Chain.blocks[genesisBlockIdx - 1]
if genesisParent.timestamp == 0:
await m.dataProvider.fetchTimestamp(genesisParent)
if m.hasEnoughValidators(genesisParent) and
genesisBlock.number - genesisParent.number > 1:
genesisBlock = await m.findGenesisBlockInRange(genesisParent,
genesisBlock)
m.signalGenesis m.createGenesisState(genesisBlock)
return
let candidateState = m.createGenesisState(genesisCandidate)
m.signalGenesis candidateState
return
except CatchableError as err:
debug "Unexpected error in checkForGenesisLoop", err = err.msg
await sleepAsync(1.seconds)
proc waitGenesis*(m: MainchainMonitor): Future[BeaconStateRef] {.async.} =
if m.genesisState.isNil:
if m.genesisStateFut.isNil:
m.genesisStateFut = newFuture[void]("waitGenesis")
m.genesisMonitoringFut = m.checkForGenesisLoop()
await m.genesisStateFut
m.genesisStateFut = nil
if bnStatus == BeaconNodeStatus.Stopping:
return new BeaconStateRef # cannot return nil...
if m.genesisState != nil:
return m.genesisState
else:
result = new BeaconStateRef # make the compiler happy
raiseAssert "Unreachable code"
func totalNonFinalizedBlocks(eth1Chain: Eth1Chain): Natural =
# TODO: implement this precisely
eth1Chain.blocks.len
func latestEth1Data(eth1Chain: Eth1Chain): Eth1Data =
if eth1Chain.blocks.len > 0:
eth1Chain.blocks[^1].voteData
else:
eth1Chain.knownStart
func knownInvalidDepositsCount(eth1Chain: Eth1Chain): uint64 =
for i in countdown(eth1Chain.blocks.len - 1, 0):
let blk = eth1Chain.blocks[i]
if blk.knownValidatorsCount.isSome:
return blk.voteData.deposit_count - blk.knownValidatorsCount.get
return 0
func maxValidDeposits(eth1Chain: Eth1Chain): uint64 =
if eth1Chain.blocks.len > 0:
let lastBlock = eth1Chain.blocks[^1]
lastBlock.knownValidatorsCount.get(
lastBlock.voteData.deposit_count - eth1Chain.knownInvalidDepositsCount)
else:
0
proc processDeposits(m: MainchainMonitor,
dataProvider: Web3DataProviderRef) {.async.} =
proc handleEth1Progress(m: Eth1Monitor) {.async.} =
# ATTENTION!
# Please note that this code is using a queue to guarantee the
# strict serial order of processing of deposits. If we had the
# same code embedded in the deposit contracts events handler,
# Please note that this code is using an async event in order
# to guarantee the strict serial order of processing of deposits.
# If we had the same code embedded in the new block headers event,
# it could easily re-order the steps due to the interruptible
# interleaved execution of async code.
var eth1SyncedTo = await m.dataProvider.getBlockNumber(
m.eth1Chain.knownStart.block_hash.asBlockHash)
while true:
m.checkIfShouldStopMainchainMonitor()
if bnStatus == BeaconNodeStatus.Stopping:
if not m.genesisStateFut.isNil:
m.genesisStateFut.complete()
m.genesisStateFut = nil
m.stop()
return
let now = epochTime()
discard await m.persistFinalizedBlocks(now)
await m.eth1Progress.wait()
m.eth1Progress.clear()
let blk = await m.depositQueue.popFirst()
m.eth1Chain.trimHeight(Eth1BlockNumber(blk.number) - 1)
if m.latestEth1BlockNumber <= m.preset.ETH1_FOLLOW_DISTANCE:
continue
let latestKnownBlock = if m.eth1Chain.blocks.len > 0:
m.eth1Chain.blocks[^1].number
elif m.eth1Chain.knownStartBlockNum.isSome:
m.eth1Chain.knownStartBlockNum.get
else:
m.eth1Chain.knownStartBlockNum = some(
await dataProvider.getBlockNumber(m.eth1Chain.knownStart.block_hash.asBlockHash))
m.eth1Chain.knownStartBlockNum.get
let targetBlock = m.latestEth1BlockNumber - m.preset.ETH1_FOLLOW_DISTANCE
if targetBlock <= eth1SyncedTo:
continue
let eth1Blocks = await dataProvider.fetchDepositData(latestKnownBlock + 1,
Eth1BlockNumber blk.number)
for i in 0 ..< eth1Blocks.len:
m.eth1Chain.addBlock eth1Blocks[i]
await m.syncBlockRange(eth1SyncedTo + 1, targetBlock)
eth1SyncedTo = targetBlock
proc isRunning*(m: MainchainMonitor): bool =
not m.runFut.isNil
while m.eth1Chain.blocks.len > 0:
# We'll clean old blocks that can no longer be voting candidates.
# Technically, we should check that the block is outside of the current
# voting period as determined by its timestamp, but we'll approximate
# this by requiring a much larger difference in block numbers.
# (i.e. twice the follow distance).
let earliestBlock = m.eth1Chain.blocks.peekFirst
if earliestBlock.number < targetBlock and
targetBlock - earliestBlock.number < m.preset.ETH1_FOLLOW_DISTANCE * 2:
break
m.eth1Chain.popFirst()
func `===`(json: JsonNode, boolean: bool): bool =
json.kind == JBool and json.bval == boolean
proc run(m: MainchainMonitor, delayBeforeStart: Duration) {.async.} =
proc run(m: Eth1Monitor, delayBeforeStart: Duration) {.async.} =
if delayBeforeStart != ZeroDuration:
await sleepAsync(delayBeforeStart)
@ -665,18 +629,18 @@ proc run(m: MainchainMonitor, delayBeforeStart: Duration) {.async.} =
await m.dataProvider.onBlockHeaders do (blk: Eth1BlockHeader)
{.raises: [Defect], gcsafe.}:
try:
m.depositQueue.addLastNoWait(blk)
except AsyncQueueFullError:
raiseAssert "The depositQueue has no size limit"
if blk.number.uint64 > m.latestEth1BlockNumber:
m.latestEth1BlockNumber = blk.number.uint64
m.eth1Progress.fire()
except Exception:
# TODO Investigate why this exception is being raised
raiseAssert "queue.addLastNoWait should not raise exceptions"
raiseAssert "AsyncEvent.fire should not raise exceptions"
do (err: CatchableError):
debug "Error while processing Eth1 block headers subscription", err = err.msg
await m.processDeposits(m.dataProvider)
await m.handleEth1Progress()
proc start(m: MainchainMonitor, delayBeforeStart: Duration) =
proc start(m: Eth1Monitor, delayBeforeStart: Duration) =
if m.runFut.isNil:
let runFut = m.run(delayBeforeStart)
m.runFut = runFut
@ -691,7 +655,7 @@ proc start(m: MainchainMonitor, delayBeforeStart: Duration) =
fatal "Fatal exception reached", err = runFut.error.msg
quit 1
proc start*(m: MainchainMonitor) {.inline.} =
proc start*(m: Eth1Monitor) {.inline.} =
m.start(0.seconds)
proc getEth1BlockHash*(url: string, blockId: RtBlockIdentifier): Future[BlockHash] {.async.} =

View File

@ -21,7 +21,7 @@ import
# TODO All tests need to be moved to the test suite.
const depositContractLimit* = Limit(1'u64 shl (DEPOSIT_CONTRACT_TREE_DEPTH - 1'u64))
const depositContractLimit* = Limit(1'u64 shl DEPOSIT_CONTRACT_TREE_DEPTH)
func attachMerkleProofs*(deposits: var openArray[Deposit]) =
let depositsRoots = mapIt(deposits, hash_tree_root(it.data))
@ -30,6 +30,7 @@ func attachMerkleProofs*(deposits: var openArray[Deposit]) =
for i in 0 ..< depositsRoots.len:
incrementalMerkleProofs.addChunkAndGenMerkleProof(depositsRoots[i], deposits[i].proof)
deposits[i].proof[32] = default(Eth2Digest)
deposits[i].proof[32].data[0..7] = toBytesLE uint64(i + 1)
template getProof*(proofs: seq[Eth2Digest], idxParam: int): openArray[Eth2Digest] =

View File

@ -28,13 +28,20 @@ const
zero64 = default array[64, byte]
bitsPerChunk = bytesPerChunk * 8
func binaryTreeHeight*(totalElements: Limit): int =
bitWidth nextPow2(uint64 totalElements)
type
SszChunksMerkleizer* {.requiresInit.} = object
SszMerkleizerImpl = object
combinedChunks: ptr UncheckedArray[Eth2Digest]
totalChunks: uint64
topIndex: int
template chunks*(m: SszChunksMerkleizer): openArray[Eth2Digest] =
SszMerkleizer*[limit: static[Limit]] = object
combinedChunks: ref array[binaryTreeHeight limit, Eth2Digest]
impl: SszMerkleizerImpl
template chunks*(m: SszMerkleizerImpl): openArray[Eth2Digest] =
m.combinedChunks.toOpenArray(0, m.topIndex)
func digest(a, b: openArray[byte]): Eth2Digest =
@ -77,7 +84,7 @@ func computeZeroHashes: array[sizeof(Limit) * 8, Eth2Digest] =
const zeroHashes* = computeZeroHashes()
func addChunk*(merkleizer: var SszChunksMerkleizer, data: openArray[byte]) =
func addChunk*(merkleizer: var SszMerkleizerImpl, data: openArray[byte]) =
doAssert data.len > 0 and data.len <= bytesPerChunk
if getBitLE(merkleizer.totalChunks, 0):
@ -107,7 +114,7 @@ func addChunk*(merkleizer: var SszChunksMerkleizer, data: openArray[byte]) =
template isOdd(x: SomeNumber): bool =
(x and 1) != 0
func addChunkAndGenMerkleProof*(merkleizer: var SszChunksMerkleizer,
func addChunkAndGenMerkleProof*(merkleizer: var SszMerkleizerImpl,
hash: Eth2Digest,
outProof: var openArray[Eth2Digest]) =
var
@ -129,7 +136,7 @@ func addChunkAndGenMerkleProof*(merkleizer: var SszChunksMerkleizer,
merkleizer.totalChunks += 1
func completeStartedChunk(merkleizer: var SszChunksMerkleizer,
func completeStartedChunk(merkleizer: var SszMerkleizerImpl,
hash: Eth2Digest, atLevel: int) =
when false:
let
@ -145,7 +152,7 @@ func completeStartedChunk(merkleizer: var SszChunksMerkleizer,
merkleizer.combinedChunks[i] = hash
break
func addChunksAndGenMerkleProofs*(merkleizer: var SszChunksMerkleizer,
func addChunksAndGenMerkleProofs*(merkleizer: var SszMerkleizerImpl,
chunks: openArray[Eth2Digest]): seq[Eth2Digest] =
doAssert chunks.len > 0 and merkleizer.topIndex > 0
@ -283,17 +290,9 @@ func addChunksAndGenMerkleProofs*(merkleizer: var SszChunksMerkleizer,
merkleizer.totalChunks = newTotalChunks
func binaryTreeHeight*(totalElements: Limit): int =
bitWidth nextPow2(uint64 totalElements)
type
SszMerkleizer*[limit: static[Limit]] = object
combinedChunks: ref array[binaryTreeHeight limit, Eth2Digest]
m: SszChunksMerkleizer
proc init*(S: type SszMerkleizer): S =
new result.combinedChunks
result.m = SszChunksMerkleizer(
result.impl = SszMerkleizerImpl(
combinedChunks: cast[ptr UncheckedArray[Eth2Digest]](
addr result.combinedChunks[][0]),
topIndex: binaryTreeHeight(result.limit) - 1,
@ -304,7 +303,7 @@ proc init*(S: type SszMerkleizer,
totalChunks: uint64): S =
new result.combinedChunks
result.combinedChunks[][0 ..< combinedChunks.len] = combinedChunks
result.m = SszChunksMerkleizer(
result.impl = SszMerkleizerImpl(
combinedChunks: cast[ptr UncheckedArray[Eth2Digest]](
addr result.combinedChunks[][0]),
topIndex: binaryTreeHeight(result.limit) - 1,
@ -313,7 +312,7 @@ proc init*(S: type SszMerkleizer,
proc clone*[L: static[Limit]](cloned: SszMerkleizer[L]): SszMerkleizer[L] =
new result.combinedChunks
result.combinedChunks[] = cloned.combinedChunks[]
result.m = SszChunksMerkleizer(
result.impl = SszMerkleizerImpl(
combinedChunks: cast[ptr UncheckedArray[Eth2Digest]](
addr result.combinedChunks[][0]),
topIndex: binaryTreeHeight(L) - 1,
@ -322,29 +321,29 @@ proc clone*[L: static[Limit]](cloned: SszMerkleizer[L]): SszMerkleizer[L] =
template addChunksAndGenMerkleProofs*(
merkleizer: var SszMerkleizer,
chunks: openArray[Eth2Digest]): seq[Eth2Digest] =
addChunksAndGenMerkleProofs(merkleizer.m, chunks)
addChunksAndGenMerkleProofs(merkleizer.impl, chunks)
template addChunk*(merkleizer: var SszMerkleizer, data: openArray[byte]) =
addChunk(merkleizer.m, data)
addChunk(merkleizer.impl, data)
template totalChunks*(merkleizer: SszMerkleizer): uint64 =
merkleizer.m.totalChunks
merkleizer.impl.totalChunks
template getFinalHash*(merkleizer: SszMerkleizer): Eth2Digest =
merkleizer.m.getFinalHash
merkleizer.impl.getFinalHash
template createMerkleizer*(totalElements: static Limit): SszChunksMerkleizer =
template createMerkleizer*(totalElements: static Limit): SszMerkleizerImpl =
trs "CREATING A MERKLEIZER FOR ", totalElements
const treeHeight = binaryTreeHeight totalElements
var combinedChunks {.noInit.}: array[treeHeight, Eth2Digest]
SszChunksMerkleizer(
SszMerkleizerImpl(
combinedChunks: cast[ptr UncheckedArray[Eth2Digest]](addr combinedChunks),
topIndex: treeHeight - 1,
totalChunks: 0)
func getFinalHash*(merkleizer: SszChunksMerkleizer): Eth2Digest =
func getFinalHash*(merkleizer: SszMerkleizerImpl): Eth2Digest =
if merkleizer.totalChunks == 0:
return zeroHashes[merkleizer.topIndex]
@ -383,7 +382,7 @@ func getFinalHash*(merkleizer: SszChunksMerkleizer): Eth2Digest =
for i in bottomHashIdx + 1 ..< topHashIdx:
result = mergeBranches(result, zeroHashes[i])
func mixInLength(root: Eth2Digest, length: int): Eth2Digest =
func mixInLength*(root: Eth2Digest, length: int): Eth2Digest =
var dataLen: array[32, byte]
dataLen[0..<8] = uint64(length).toBytesLE()
mergeBranches(root, dataLen)
@ -408,7 +407,7 @@ template writeBytesLE(chunk: var array[bytesPerChunk, byte], atParam: int,
let at = atParam
chunk[at ..< at + sizeof(val)] = toBytesLE(val)
func chunkedHashTreeRootForBasicTypes[T](merkleizer: var SszChunksMerkleizer,
func chunkedHashTreeRootForBasicTypes[T](merkleizer: var SszMerkleizerImpl,
arr: openArray[T]): Eth2Digest =
static:
doAssert T is BasicType
@ -460,7 +459,7 @@ func chunkedHashTreeRootForBasicTypes[T](merkleizer: var SszChunksMerkleizer,
getFinalHash(merkleizer)
func bitListHashTreeRoot(merkleizer: var SszChunksMerkleizer, x: BitSeq): Eth2Digest =
func bitListHashTreeRoot(merkleizer: var SszMerkleizerImpl, x: BitSeq): Eth2Digest =
# TODO: Switch to a simpler BitList representation and
# replace this with `chunkedHashTreeRoot`
trs "CHUNKIFYING BIT SEQ WITH TOP INDEX ", merkleizer.topIndex

View File

@ -185,11 +185,11 @@ proc makeBeaconBlockForHeadAndSlot*(node: BeaconNode,
# Advance state to the slot that we're proposing for
node.chainDag.withState(node.chainDag.tmpState, head.atSlot(slot)):
let (eth1data, deposits) =
if node.mainchainMonitor.isNil:
if node.eth1Monitor.isNil:
(state.eth1_data, newSeq[Deposit]())
else:
let finalizedEth1Data = node.chainDag.getFinalizedEpochRef().eth1_data
node.mainchainMonitor.getBlockProposalData(state, finalizedEth1Data)
node.eth1Monitor.getBlockProposalData(state, finalizedEth1Data)
let
poolPtr = unsafeAddr node.chainDag # safe because restore is short-lived

View File

@ -227,6 +227,7 @@ func attachMerkleProofsReferenceImpl(deposits: var openArray[Deposit]) =
for val_idx in 0 ..< deposits.len:
deposits[val_idx].proof[0..31] = merkle_tree.getMerkleProof(val_idx, true)
deposits[val_idx].proof[32] = default(Eth2Digest)
deposits[val_idx].proof[32].data[0..7] = uint_to_bytes8((val_idx + 1).uint64)
doAssert is_valid_merkle_branch(