Switch to monitoring of Blocks; More accurate genesis detection and faster Eth1 syncing

This commit is contained in:
Zahary Karadjov 2020-06-27 15:01:19 +03:00 committed by zah
parent 816779733e
commit 31e31bb30c
5 changed files with 412 additions and 330 deletions

View File

@ -145,19 +145,27 @@ proc init*(T: type BeaconNode, conf: BeaconNodeConf): Future[BeaconNode] {.async
# Try file from command line first
if genesisState.isNil:
# Didn't work, try creating a genesis state using main chain monitor
if conf.web3Url.len == 0:
fatal "Web3 URL not specified"
quit 1
if conf.depositContractAddress.len == 0:
fatal "Deposit contract address not specified"
quit 1
if conf.depositContractDeployedAt.isNone:
# When we don't have a known genesis state, the network metadata
# must specify the deployment block of the contract.
fatal "Deposit contract deployment block not specified"
quit 1
# TODO Could move this to a separate "GenesisMonitor" process or task
# that would do only this - see Paul's proposal for this.
if conf.web3Url.len > 0 and conf.depositContractAddress.len > 0:
mainchainMonitor = MainchainMonitor.init(
web3Provider(conf.web3Url),
conf.depositContractAddress,
conf.depositContractDeployedAt,
FromContractDeploymentBlock)
mainchainMonitor.start()
else:
error "No initial state, need genesis state or deposit contract address"
quit 1
mainchainMonitor = MainchainMonitor.init(
web3Provider(conf.web3Url),
conf.depositContractAddress,
Eth1Data(block_hash: conf.depositContractDeployedAt.get, deposit_count: 0))
mainchainMonitor.start()
genesisState = await mainchainMonitor.getGenesis()
@ -195,8 +203,7 @@ proc init*(T: type BeaconNode, conf: BeaconNodeConf): Future[BeaconNode] {.async
mainchainMonitor = MainchainMonitor.init(
web3Provider(conf.web3Url),
conf.depositContractAddress,
some blockPool.headState.data.data.eth1_data.block_hash,
FromSnapshot)
blockPool.headState.data.data.eth1_data)
# TODO if we don't have any validators attached, we don't need a mainchain
# monitor
mainchainMonitor.start()

View File

@ -4,6 +4,8 @@ import
spec/[datatypes, digest, crypto, beaconstate, helpers],
merkle_minimal
from times import epochTime
export
ethtypes
@ -21,7 +23,6 @@ contract(DepositContract):
amount: Bytes8,
signature: Bytes96,
index: Bytes8) {.event.}
# TODO
# The raises list of this module are still not usable due to general
# Exceptions being reported from Chronos's asyncfutures2.
@ -35,36 +36,29 @@ type
timestamp*: Eth1BlockTimestamp
deposits*: seq[Deposit]
voteData*: Eth1Data
StartKind* = enum
FromContractDeploymentBlock
FromSnapshot
knownGoodDepositsCount*: Option[uint64]
Eth1Chain* = object
knownStart: Eth1Data
knownStartBlockNum: Option[Eth1BlockNumber]
blocks: Deque[Eth1Block]
blocksByHash: Table[BlockHash, Eth1Block]
startKind: StartKind
allDeposits*: seq[Deposit]
MainchainMonitor* = ref object
depositContractAddress: Address
startBlock: Option[Eth2Digest]
dataProviderFactory*: DataProviderFactory
genesisState: NilableBeaconStateRef
genesisStateFut: Future[void]
genesisMonitoringFut: Future[void]
eth1Chain: Eth1Chain
depositQueue: AsyncQueue[DepositQueueElem]
depositQueue: AsyncQueue[BlockHeader]
runFut: Future[void]
Web3EventType = enum
NewEvent
RemovedEvent
DepositQueueElem = (BlockHash, Web3EventType)
DataProvider* = object of RootObj
DataProviderRef* = ref DataProvider
@ -79,7 +73,7 @@ type
url: string
web3: Web3
ns: Sender[DepositContract]
subscription: Subscription
blockHeadersSubscription: Subscription
Web3DataProviderRef* = ref Web3DataProvider
@ -97,11 +91,20 @@ type
const
reorgDepthLimit = 1000
web3Timeouts = 5.seconds
followDistanceInSeconds = uint64(SECONDS_PER_ETH1_BLOCK * ETH1_FOLLOW_DISTANCE)
totalDepositsNeededForGenesis = uint64 max(SLOTS_PER_EPOCH,
MIN_GENESIS_ACTIVE_VALIDATOR_COUNT)
# TODO Nim's analysis on the lock level of the methods in this
# module seems broken. Investigate and file this as an issue.
{.push warning[LockLevel]: off.}
static:
# https://github.com/ethereum/eth2.0-specs/blob/v0.12.1/specs/phase0/beacon-chain.md#genesis
when SPEC_VERSION == "0.12.1":
doAssert SECONDS_PER_ETH1_BLOCK * ETH1_FOLLOW_DISTANCE < GENESIS_DELAY,
"Invalid configuration: GENESIS_DELAY is set too low"
# https://github.com/ethereum/eth2.0-specs/blob/v0.12.1/specs/phase0/validator.md#get_eth1_data
func compute_time_at_slot(state: BeaconState, slot: Slot): uint64 =
state.genesis_time + slot * SECONDS_PER_SLOT
@ -123,6 +126,9 @@ func asEth2Digest*(x: BlockHash): Eth2Digest =
template asBlockHash(x: Eth2Digest): BlockHash =
BlockHash(x.data)
func shortLog(b: Eth1Block): string =
&"{b.number}:{shortLog b.voteData.block_hash}"
func getDepositsInRange(eth1Chain: Eth1Chain,
sinceBlock, latestBlock: Eth1BlockNumber): seq[Deposit] =
## Returns all deposits that happened AFTER the block `sinceBlock` (not inclusive).
@ -164,41 +170,38 @@ func latestCandidateBlock(eth1Chain: Eth1Chain, periodStart: uint64): Eth1Block
if is_candidate_block(blk, periodStart):
return blk
func popBlock(eth1Chain: var Eth1Chain) =
let removed = eth1Chain.blocks.popLast
eth1Chain.blocksByHash.del removed.voteData.block_hash.asBlockHash
func trimHeight(eth1Chain: var Eth1Chain, blockNumber: Eth1BlockNumber) =
## Removes all blocks above certain `blockNumber`
if eth1Chain.blocks.len == 0:
return
while eth1Chain.blocks.len > 0:
if eth1Chain.blocks.peekLast.number > blockNumber:
eth1Chain.popBlock()
else:
break
let newLen = max(0, int(blockNumber - eth1Chain.blocks[0].number + 1))
for i in newLen ..< eth1Chain.blocks.len:
let removed = eth1Chain.blocks.popLast
eth1Chain.blocksByHash.del removed.voteData.block_hash.asBlockHash
template purgeChain*(eth1Chain: var Eth1Chain, blk: Eth1Block) =
## This is used when we discover that a previously considered block
## is no longer part of the selected chain (due to a reorg). We can
## then remove from our chain together with all blocks that follow it.
trimHeight(eth1Chain, blk.number - 1)
func purgeChain*(eth1Chain: var Eth1Chain, blockHash: BlockHash) =
let blk = eth1Chain.findBlock(blockHash)
if blk != nil: eth1Chain.purgeChain(blk)
template purgeDescendants*(eth1CHain: Eth1Chain, blk: Eth1Block) =
trimHeight(eth1Chain, blk.number)
if eth1Chain.blocks.len > 0:
eth1Chain.allDeposits.setLen(eth1Chain.blocks[^1].voteData.deposit_count)
else:
eth1Chain.allDeposits.setLen(0)
func isSuccessorBlock(eth1Chain: Eth1Chain, newBlock: Eth1Block): bool =
if eth1Chain.blocks.len == 0:
return eth1Chain.startKind == FromSnapshot or
newBlock.deposits.len.uint64 == newBlock.voteData.deposit_count
let currentDepositCount = if eth1Chain.blocks.len == 0:
eth1Chain.knownStart.deposit_count
else:
let lastBlock = eth1Chain.blocks.peekLast
if lastBlock.number >= newBlock.number: return false
lastBlock.voteData.deposit_count
let lastBlock = eth1Chain.blocks.peekLast
lastBlock.number < newBlock.number and
(lastBlock.voteData.deposit_count + newBlock.deposits.len.uint64) == newBlock.voteData.deposit_count
(currentDepositCount + newBlock.deposits.len.uint64) == newBlock.voteData.deposit_count
func addSuccessorBlock*(eth1Chain: var Eth1Chain, newBlock: Eth1Block): bool =
result = isSuccessorBlock(eth1Chain, newBlock)
if result:
eth1Chain.allDeposits.add newBlock.deposits
reset newBlock.deposits
eth1Chain.blocks.addLast newBlock
eth1Chain.blocksByHash[newBlock.voteData.block_hash.asBlockHash] = newBlock
@ -227,6 +230,14 @@ method getBlockByHash*(p: DataProviderRef, hash: BlockHash): Future[BlockObject]
.} =
notImplemented
method getBlockByNumber*(p: DataProviderRef, hash: Eth1BlockNumber): Future[BlockObject] {.
base
gcsafe
locks: 0
# raises: [Defect]
.} =
notImplemented
method onDisconnect*(p: DataProviderRef, handler: DisconnectHandler) {.
base
gcsafe
@ -235,9 +246,9 @@ method onDisconnect*(p: DataProviderRef, handler: DisconnectHandler) {.
.} =
notImplemented
method onDepositEvent*(p: DataProviderRef,
startBlock: Eth1BlockNumber,
handler: DepositEventHandler): Future[void] {.
method onBlockHeaders*(p: DataProviderRef,
blockHeaderHandler: BlockHeaderHandler,
errorHandler: SubscriptionErrorHandler): Future[void] {.
base
gcsafe
locks: 0
@ -253,8 +264,8 @@ method close*(p: DataProviderRef): Future[void] {.
.} =
notImplemented
method hasDepositContract*(p: DataProviderRef,
web3Block: BlockObject): Future[bool] {.
method fetchDepositData*(p: DataProviderRef,
fromBlock, toBlock: Eth1BlockNumber): Future[seq[Eth1Block]] {.
base
gcsafe
locks: 0
@ -262,8 +273,7 @@ method hasDepositContract*(p: DataProviderRef,
.} =
notImplemented
method fetchDepositData*(p: DataProviderRef,
web3Block: BlockObject): Future[Eth1Block] {.
method fetchBlockDetails(p: DataProviderRef, blk: Eth1Block): Future[void] {.
base
gcsafe
locks: 0
@ -271,6 +281,146 @@ method fetchDepositData*(p: DataProviderRef,
.} =
notImplemented
proc new*(T: type Web3DataProvider,
web3Url: string,
depositContractAddress: Address): Future[ref Web3DataProvider] {.
async
# raises: [Defect]
.} =
try:
type R = ref T
let
web3 = await newWeb3(web3Url)
ns = web3.contractSender(DepositContract, depositContractAddress)
return R(url: web3Url, web3: web3, ns: ns)
except CatchableError:
return nil
func web3Provider*(web3Url: string): DataProviderFactory =
proc factory(depositContractAddress: Address): Future[DataProviderRef] {.async.} =
result = await Web3DataProvider.new(web3Url, depositContractAddress)
DataProviderFactory(desc: "web3(" & web3Url & ")", new: factory)
method close*(p: Web3DataProviderRef): Future[void] {.async, locks: 0.} =
if p.blockHeadersSubscription != nil:
await p.blockHeadersSubscription.unsubscribe()
await p.web3.close()
method getBlockByHash*(p: Web3DataProviderRef, hash: BlockHash): Future[BlockObject] =
return p.web3.provider.eth_getBlockByHash(hash, false)
method getBlockByNumber*(p: Web3DataProviderRef, number: Eth1BlockNumber): Future[BlockObject] =
return p.web3.provider.eth_getBlockByNumber(&"0x{number:X}", false)
proc getBlockNumber(p: DataProviderRef, hash: BlockHash): Future[Eth1BlockNumber] {.async.} =
debug "Querying block number", hash = $hash
try:
let blk = awaitWithTimeout(p.getBlockByHash(hash), web3Timeouts):
return 0
return Eth1BlockNumber(blk.number)
except CatchableError as exc:
notice "Failed to get Eth1 block number from hash",
hash = $hash, err = exc.msg
raise
template readJsonField(j: JsonNode,
fieldName: string,
ValueType: type): untyped =
var res: ValueType
fromJson(j[fieldName], fieldName, res)
res
proc readJsonDeposits(depositsList: JsonNode): seq[Eth1Block] =
if depositsList.kind != JArray:
raise newException(CatchableError,
"Web3 provider didn't return a list of deposit events")
var lastEth1Block: Eth1Block
for logEvent in depositsList:
let
blockNumber = Eth1BlockNumber readJsonField(logEvent, "blockNumber", Quantity)
blockHash = readJsonField(logEvent, "blockHash", BlockHash)
logData = strip0xPrefix(logEvent["data"].getStr)
if lastEth1Block == nil or lastEth1Block.number != blockNumber:
lastEth1Block = Eth1Block(
number: blockNumber,
voteData: Eth1Data(block_hash: blockHash.asEth2Digest))
result.add lastEth1Block
var
pubkey: Bytes48
withdrawalCredentials: Bytes32
amount: Bytes8
signature: Bytes96
index: Bytes8
var offset = 0
offset += decode(logData, offset, pubkey)
offset += decode(logData, offset, withdrawalCredentials)
offset += decode(logData, offset, amount)
offset += decode(logData, offset, signature)
offset += decode(logData, offset, index)
lastEth1Block.deposits.add Deposit(
data: DepositData(
pubkey: ValidatorPubKey.init(array[48, byte](pubkey)),
withdrawal_credentials: Eth2Digest(data: array[32, byte](withdrawalCredentials)),
amount: bytes_to_int(array[8, byte](amount)),
signature: ValidatorSig.init(array[96, byte](signature))))
method fetchDepositData*(p: Web3DataProviderRef,
fromBlock, toBlock: Eth1BlockNumber): Future[seq[Eth1Block]]
{.async, locks: 0.} =
return readJsonDeposits(await p.ns.getJsonLogs(DepositEvent,
fromBlock = some blockId(fromBlock),
toBlock = some blockId(toBlock)))
method fetchBlockDetails(p: Web3DataProviderRef, blk: Eth1Block) {.async.} =
let
web3Block = p.getBlockByNumber(blk.number)
depositRoot = p.ns.get_deposit_root.call(blockNumber = blk.number)
rawCount = p.ns.get_deposit_count.call(blockNumber = blk.number)
discard await web3Block
discard await depositRoot
discard await rawCount
let depositCount = bytes_to_int(array[8, byte](rawCount.read))
blk.timestamp = Eth1BlockTimestamp(web3Block.read.timestamp)
blk.voteData.deposit_count = depositCount
blk.voteData.deposit_root = depositRoot.read.asEth2Digest
method onDisconnect*(p: Web3DataProviderRef, handler: DisconnectHandler) {.
gcsafe
locks: 0
# raises: []
.} =
p.web3.onDisconnect = handler
method onBlockHeaders*(p: Web3DataProviderRef,
blockHeaderHandler: BlockHeaderHandler,
errorHandler: SubscriptionErrorHandler): Future[void] {.
async
gcsafe
locks: 0
# raises: []
.} =
if p.blockHeadersSubscription != nil:
await p.blockHeadersSubscription.unsubscribe()
debug "Subscribing for block headers"
let options = newJObject()
p.blockHeadersSubscription = await p.web3.subscribeForBlockHeaders(
options, blockHeaderHandler, errorHandler)
# https://github.com/ethereum/eth2.0-specs/blob/v0.11.1/specs/phase0/validator.md#get_eth1_data
func getBlockProposalData*(eth1Chain: Eth1Chain,
state: BeaconState): (Eth1Data, seq[Deposit]) =
@ -310,175 +460,100 @@ template getBlockProposalData*(m: MainchainMonitor, state: BeaconState): untyped
proc init*(T: type MainchainMonitor,
dataProviderFactory: DataProviderFactory,
depositContractAddress: string,
startBlock: Option[Eth2Digest],
startKind: StartKind): T =
T(depositQueue: newAsyncQueue[DepositQueueElem](),
startPosition: Eth1Data): T =
T(depositQueue: newAsyncQueue[BlockHeader](),
dataProviderFactory: dataProviderFactory,
depositContractAddress: Address.fromHex(depositContractAddress),
startBlock: startBlock,
eth1Chain: Eth1Chain(startKind: startKind))
eth1Chain: Eth1Chain(knownStart: startPosition))
proc readJsonDeposits(depositsList: JsonNode): seq[Deposit] =
if depositsList.kind != JArray:
raise newException(CatchableError,
"Web3 provider didn't return a list of deposit events")
proc isCandidateForGenesis(timeNow: float, blk: Eth1Block): bool =
if float(blk.timestamp + followDistanceInSeconds) > timeNow:
return false
for logEvent in depositsList:
var logData = strip0xPrefix(logEvent["data"].getStr)
var
pubkey: Bytes48
withdrawalCredentials: Bytes32
amount: Bytes8
signature: Bytes96
index: Bytes8
if genesis_time_from_eth1_timestamp(blk.timestamp) < MIN_GENESIS_TIME:
return false
var offset = 0
offset += decode(logData, offset, pubkey)
offset += decode(logData, offset, withdrawalCredentials)
offset += decode(logData, offset, amount)
offset += decode(logData, offset, signature)
offset += decode(logData, offset, index)
if blk.knownGoodDepositsCount.isSome:
blk.knownGoodDepositsCount.get >= totalDepositsNeededForGenesis
else:
blk.voteData.deposit_count >= totalDepositsNeededForGenesis
result.add Deposit(
data: DepositData(
pubkey: ValidatorPubKey.init(array[48, byte](pubkey)),
withdrawal_credentials: Eth2Digest(data: array[32, byte](withdrawalCredentials)),
amount: bytes_to_int(array[8, byte](amount)),
signature: ValidatorSig.init(array[96, byte](signature))))
proc checkForGenesisEvent(m: MainchainMonitor) =
if not m.genesisState.isNil:
proc minGenesisCandidateBlockIdx(eth1Chain: Eth1Chain): Option[int]
{.raises: [Defect].} =
if eth1Chain.blocks.len == 0:
return
let lastBlock = m.eth1Chain.blocks.peekLast
const totalDepositsNeeded = max(SLOTS_PER_EPOCH,
MIN_GENESIS_ACTIVE_VALIDATOR_COUNT)
let now = epochTime()
if not isCandidateForGenesis(now, eth1Chain.blocks.peekLast):
return
if lastBlock.timestamp.uint64 >= MIN_GENESIS_TIME.uint64 and
m.eth1Chain.totalDeposits >= totalDepositsNeeded:
# This block is a genesis candidate
let startTime = lastBlock.timestamp.uint64
var genesisDeposits = m.eth1Chain.allDeposits
attachMerkleProofs genesisDeposits
var s = initialize_beacon_state_from_eth1(lastBlock.voteData.block_hash,
startTime, genesisDeposits, {})
if is_valid_genesis_state(s[]):
# https://github.com/ethereum/eth2.0-pm/tree/6e41fcf383ebeb5125938850d8e9b4e9888389b4/interop/mocked_start#create-genesis-state
info "Eth2 genesis state detected",
genesisTime = startTime,
genesisEth1Block = lastBlock.voteData.block_hash
var candidatePos = eth1Chain.blocks.len - 1
while candidatePos > 1:
if not isCandidateForGenesis(now, eth1Chain.blocks[candidatePos - 1]):
break
dec candidatePos
s.genesis_time = startTime
m.genesisState = s
return some(candidatePos)
if not m.genesisStateFut.isNil:
m.genesisStateFut.complete()
m.genesisStateFut = nil
proc tryCreatingGenesisState(m: MainchainMonitor,
genesisCandidate: Eth1Block,
deposits: var openarray[Deposit]): uint64 =
attachMerkleProofs deposits
proc processDeposits(m: MainchainMonitor,
dataProvider: DataProviderRef,
startBlkNum: Eth1BlockNumber) {.async.} =
# ATTENTION!
# Please note that this code is using a queue to guarantee the
# strict serial order of processing of deposits. If we had the
# same code embedded in the deposit contracts events handler,
# it could easily re-order the steps due to the intruptable
# interleaved execution of async code.
let
s = initialize_beacon_state_from_eth1(genesisCandidate.voteData.block_hash,
genesisCandidate.timestamp.uint64,
deposits, {})
active_validator_indices = get_active_validator_indices(s[], GENESIS_EPOCH)
if is_valid_genesis_state(s[], active_validator_indices):
# https://github.com/ethereum/eth2.0-pm/tree/6e41fcf383ebeb5125938850d8e9b4e9888389b4/interop/mocked_start#create-genesis-state
info "Eth2 genesis state detected",
genesisTime = s.genesisTime,
genesisEth1Block = genesisCandidate.voteData.block_hash
m.genesisState = s
if not m.genesisStateFut.isNil:
m.genesisStateFut.complete()
m.genesisStateFut = nil
else:
info "Eth2 genesis candidate block rejected",
`block` = shortLog(genesisCandidate),
validDeposits = active_validator_indices.len,
needed = totalDepositsNeededForGenesis
uint64(active_validator_indices.len)
proc checkForGenesisLoop(m: MainchainMonitor) {.async.} =
while true:
let (blockHash, eventType) = await m.depositQueue.popFirst()
if not m.genesisState.isNil:
return
if eventType == RemovedEvent:
debug "New Eth1 head selected. Purging history of deposits",
purgedBlock = $blockHash
m.eth1Chain.purgeChain(blockHash)
continue
try:
let genesisCandidateIdx = m.eth1Chain.minGenesisCandidateBlockIdx
if genesisCandidateIdx.isSome:
let
genesisCandidate = m.eth1Chain.blocks[genesisCandidateIdx.get]
eligibleDepositCount = genesisCandidate.voteData.deposit_count
let cachedBlock = m.eth1Chain.findBlock(blockHash)
if cachedBlock == nil:
try:
let web3Block = await dataProvider.getBlockByHash(blockHash)
if Eth1BlockNumber(web3Block.number) < startBlkNum:
warn "Invalid deposit reported from the web3 end-point",
reportedBlock = web3Block.number.uint64, minExpectedBlock = startBlkNum
continue
genesisCandidate.knownGoodDepositsCount = some tryCreatingGenesisState(
m, genesisCandidate,
m.eth1Chain.allDeposits.toOpenArray(0, int(eligibleDepositCount - 1)))
else:
# TODO: check for a stale monitor
discard
except CatchableError as err:
debug "Unexpected error in checkForGenesisLoop", err = err.msg
let eth1Block = await dataProvider.fetchDepositData(web3Block)
if m.eth1Chain.addSuccessorBlock(eth1Block):
# TODO: We may check that the new deposits produce a merkle
# root matching the `deposit_root` value from the block.
# Not doing this is equivalent to trusting the Eth1
# execution engine and data provider.
info "Eth1 block processed", eth1data = eth1Block.voteData
m.checkForGenesisEvent()
else:
# We are missing the parent block.
# This shouldn't be happening if the deposits events are reported in
# proper order, but nevertheless let's try to repair our chain:
var cachedParent = m.eth1Chain.findParent(web3Block)
doAssert cachedParent == nil
var chainOfParents = newSeq[Eth1Block]()
var parentHash = web3Block.parentHash
var expectedParentBlockNumber = web3Block.number.uint64 - 1
debug "Eth1 parent block missing. Attempting to request from the network",
parentHash = parentHash.toHex, expectedParentBlockNumber
while true:
if chainOfParents.len > reorgDepthLimit:
error "Detected Eth1 re-org exceeded the maximum depth limit",
headBlockHash = web3Block.hash.toHex,
ourHeadHash = m.eth1Chain.blocks.peekLast.voteData.block_hash
raise newException(ReorgDepthLimitExceeded, "Reorg depth limit exceeded")
let parentWeb3Block = await dataProvider.getBlockByHash(parentHash)
if parentWeb3Block.number.uint64 != expectedParentBlockNumber:
error "Eth1 data provider supplied invalid parent block",
parentBlockNumber = parentWeb3Block.number.uint64,
expectedParentBlockNumber, parentHash = parentHash.toHex
raise newException(CorruptDataProvider,
"Parent block with incorrect number")
if expectedParentBlockNumber <= startBlkNum or
startBlkNum == 0 and not await dataProvider.hasDepositContract(parentWeb3Block):
# We've reached the deposit contract creation
# No more deposit events are expected
m.eth1Chain.clear()
for i in countdown(chainOfParents.len - 1, 0):
let isSuccessor = m.eth1Chain.addSuccessorBlock chainOfParents[i]
doAssert isSuccessor
cachedParent = m.eth1Chain.blocks.peekLast
break
chainOfParents.add(await dataProvider.fetchDepositData(parentWeb3Block))
let localParent = m.eth1Chain.findParent(parentWeb3Block)
if localParent != nil:
m.eth1Chain.purgeDescendants(localParent)
for i in countdown(chainOfParents.len - 1, 0):
let isSuccessor = m.eth1Chain.addSuccessorBlock chainOfParents[i]
doAssert isSuccessor
cachedParent = m.eth1Chain.blocks.peekLast
break
dec expectedParentBlockNumber
parentHash = parentWeb3Block.parentHash
m.eth1Chain.purgeDescendants(cachedParent)
except CatchableError as err:
# Connection problem? Put the unprocessed deposit back to queue.
# Raising the exception here will lead to a restart of the whole monitor.
m.depositQueue.addFirstNoWait((blockHash, eventType))
raise err
proc isRunning*(m: MainchainMonitor): bool =
not m.runFut.isNil
await sleepAsync(1.seconds)
proc getGenesis*(m: MainchainMonitor): Future[BeaconStateRef] {.async.} =
if m.genesisState.isNil:
if m.genesisStateFut.isNil:
m.genesisStateFut = newFuture[void]("getGenesis")
m.genesisMonitoringFut = m.checkForGenesisLoop()
await m.genesisStateFut
m.genesisStateFut = nil
@ -488,92 +563,101 @@ proc getGenesis*(m: MainchainMonitor): Future[BeaconStateRef] {.async.} =
result = new BeaconStateRef # make the compiler happy
raiseAssert "Unreachable code"
method getBlockByHash*(p: Web3DataProviderRef, hash: BlockHash): Future[BlockObject] =
return p.web3.provider.eth_getBlockByHash(hash, false)
func totalNonFinalizedBlocks(eth1Chain: Eth1Chain): Natural =
# TODO: implement this precisely
eth1Chain.blocks.len
method close*(p: Web3DataProviderRef): Future[void] {.async, locks: 0.} =
if p.subscription != nil:
await p.subscription.unsubscribe()
await p.web3.close()
func latestEth1Data(eth1Chain: Eth1Chain): Eth1Data =
if eth1Chain.blocks.len > 0:
eth1Chain.blocks[^1].voteData
else:
eth1Chain.knownStart
method hasDepositContract*(p: Web3DataProviderRef,
web3Block: BlockObject): Future[bool] {.async, locks: 0.} =
result = await p.ns.isDeployed(web3Block.blockId)
func knownInvalidDepositsCount(eth1Chain: Eth1Chain): uint64 =
for i in countdown(eth1Chain.blocks.len - 1, 0):
let blk = eth1Chain.blocks[i]
if blk.knownGoodDepositsCount.isSome:
return blk.voteData.deposit_count - blk.knownGoodDepositsCount.get
method fetchDepositData*(p: Web3DataProviderRef,
web3Block: BlockObject): Future[Eth1Block] {.async, locks: 0.} =
let
blockHash = web3Block.hash
blockId = web3Block.blockId
depositRoot = await p.ns.get_deposit_root.call(blockNumber = web3Block.number.uint64)
rawCount = await p.ns.get_deposit_count.call(blockNumber = web3Block.number.uint64)
depositCount = bytes_to_int(array[8, byte](rawCount))
depositsJson = await p.ns.getJsonLogs(DepositEvent, fromBlock = some(blockId), toBlock = some(blockId))
deposits = readJsonDeposits(depositsJson)
return 0
return Eth1Block(
number: Eth1BlockNumber(web3Block.number),
timestamp: Eth1BlockTimestamp(web3Block.timestamp),
deposits: deposits,
voteData: Eth1Data(deposit_root: depositRoot.asEth2Digest,
deposit_count: depositCount,
block_hash: blockHash.asEth2Digest))
func maxValidDeposits(eth1Chain: Eth1Chain): uint64 =
if eth1Chain.blocks.len > 0:
let lastBlock = eth1Chain.blocks[^1]
lastBlock.knownGoodDepositsCount.get(
lastBlock.voteData.deposit_count - eth1Chain.knownInvalidDepositsCount)
else:
0
method onDisconnect*(p: Web3DataProviderRef, handler: DisconnectHandler) {.
gcsafe
locks: 0
# raises: []
.} =
p.web3.onDisconnect = handler
proc processDeposits(m: MainchainMonitor,
dataProvider: DataProviderRef) {.async.} =
# ATTENTION!
# Please note that this code is using a queue to guarantee the
# strict serial order of processing of deposits. If we had the
# same code embedded in the deposit contracts events handler,
# it could easily re-order the steps due to the intruptable
# interleaved execution of async code.
while true:
let blk = await m.depositQueue.popFirst()
m.eth1Chain.trimHeight(Eth1BlockNumber(blk.number) - 1)
method onDepositEvent*(p: Web3DataProviderRef,
startBlock: Eth1BlockNumber,
handler: DepositEventHandler): Future[void] {.
async
gcsafe
locks: 0
# raises: []
.} =
if p.subscription != nil:
await p.subscription.unsubscribe()
let latestKnownBlock = if m.eth1Chain.blocks.len > 0:
m.eth1Chain.blocks[^1].number
elif m.eth1Chain.knownStartBlockNum.isSome:
m.eth1Chain.knownStartBlockNum.get
else:
m.eth1Chain.knownStartBlockNum = some(
await dataProvider.getBlockNumber(m.eth1Chain.knownStart.block_hash.asBlockHash))
m.eth1Chain.knownStartBlockNum.get
debug "Subscribing for deposit events", startBlock
let eth1Blocks = await dataProvider.fetchDepositData(latestKnownBlock + 1,
Eth1BlockNumber blk.number)
if eth1Blocks.len == 0:
if m.eth1Chain.maxValidDeposits > totalDepositsNeededForGenesis:
let latestEth1Data = m.eth1Chain.latestEth1Data
p.subscription = await p.ns.subscribe(
DepositEvent, %*{"fromBlock": &"0x{startBlock:X}"}, handler)
for missingBlockNum in latestKnownBlock + 1 ..< Eth1BlockNumber(blk.number):
let missingBlock = await dataProvider.getBlockByNumber(missingBlockNum)
doAssert m.eth1Chain.addSuccessorBlock Eth1Block(
number: Eth1BlockNumber(missingBlock.number),
timestamp: Eth1BlockTimestamp(missingBlock.timestamp),
voteData: latestEth1Data)
proc getBlockNumber(p: DataProviderRef, hash: BlockHash): Future[Eth1BlockNumber] {.async.} =
debug "Querying block number", hash = $hash
doAssert m.eth1Chain.addSuccessorBlock Eth1Block(
number: Eth1BlockNumber(blk.number),
timestamp: Eth1BlockTimestamp(blk.timestamp),
voteData: latestEth1Data)
else:
template logBlockProcessed(blk) =
info "Eth1 block processed",
`block` = shortLog(blk), totalDeposits = blk.voteData.deposit_count
try:
let blk = awaitWithTimeout(p.getBlockByHash(hash), web3Timeouts):
return 0
return Eth1BlockNumber(blk.number)
except CatchableError as exc:
notice "Failed to get Eth1 block number from hash",
hash = $hash, err = exc.msg
raise
await dataProvider.fetchBlockDetails(eth1Blocks[0])
if m.eth1Chain.addSuccessorBlock(eth1Blocks[0]):
logBlockProcessed eth1Blocks[0]
proc new*(T: type Web3DataProvider,
web3Url: string,
depositContractAddress: Address): Future[ref Web3DataProvider] {.
async
# raises: [Defect]
.} =
try:
type R = ref T
let
web3 = await newWeb3(web3Url)
ns = web3.contractSender(DepositContract, depositContractAddress)
return R(url: web3Url, web3: web3, ns: ns)
except CatchableError:
return nil
for i in 1 ..< eth1Blocks.len:
await dataProvider.fetchBlockDetails(eth1Blocks[i])
if m.eth1Chain.addSuccessorBlock(eth1Blocks[i]):
logBlockProcessed eth1Blocks[i]
else:
raise newException(CorruptDataProvider,
"A non-successor Eth1 block reported")
else:
# A non-continuous chain detected.
# This could be the result of a deeper fork that was not reported
# properly by the web3 provider. Since this should be an extremely
# rare event we can afford to handle it in a relatively inefficient
# manner. Let's delete half of our non-finalized chain and try again.
let blocksToPop = max(1, m.eth1Chain.totalNonFinalizedBlocks div 2)
warn "Web3 provider responded with a non-continous chain of deposits.",
backtrackedDeposits = blocksToPop
for i in 0 ..< blocksToPop:
m.eth1Chain.popBlock()
m.depositQueue.addFirstNoWait blk
func web3Provider*(web3Url: string): DataProviderFactory =
proc factory(depositContractAddress: Address): Future[DataProviderRef] {.async.} =
result = await Web3DataProvider.new(web3Url, depositContractAddress)
DataProviderFactory(desc: "web3(" & web3Url & ")", new: factory)
proc isRunning*(m: MainchainMonitor): bool =
not m.runFut.isNil
func `===`(json: JsonNode, boolean: bool): bool =
json.kind == JBool and json.bval == boolean
@ -589,43 +673,36 @@ proc run(m: MainchainMonitor, delayBeforeStart: Duration) {.async.} =
raise newException(CatchableError, "Failed to initialize Eth1 data provider")
try:
let startBlkNum = if m.startBlock.isSome:
await dataProvider.getBlockNumber(m.startBlock.get.asBlockHash)
else:
0
info "Monitoring eth1 deposits",
fromBlock = startBlkNum.uint64,
contract = $m.depositContractAddress,
url = m.dataProviderFactory.desc
await dataProvider.onDepositEvent(Eth1BlockNumber(startBlkNum)) do (
pubkey: Bytes48,
withdrawalCredentials: Bytes32,
amount: Bytes8,
signature: Bytes96, merkleTreeIndex: Bytes8, j: JsonNode)
{.raises: [Defect], gcsafe.}:
await dataProvider.onBlockHeaders do (blk: BlockHeader)
{.raises: [Defect], gcsafe}:
try:
let
blockHash = BlockHash.fromHex(j["blockHash"].getStr())
eventType = if j{"removed"} === true: RemovedEvent
else: NewEvent
m.depositQueue.addLastNoWait(blk)
except AsyncQueueFullError:
raiseAssert "The depositQueue has no size limit"
except Exception:
# TODO Investigate why this exception is being raised
raiseAssert "queue.addLastNoWait should not raise exceptions"
do (err: CatchableError):
debug "Error while processing Eth1 block headers subscription", err = err.msg
m.depositQueue.addLastNoWait((blockHash, eventType))
await m.processDeposits(dataProvider)
except CatchableError as exc:
warn "Received invalid deposit", err = exc.msg, j
except Exception as err:
# chronos still raises exceptions which inherit directly from Exception
if err[] of Defect:
raise (ref Defect)(err)
else:
warn "Received invalid deposit", err = err.msg, j
await m.processDeposits(dataProvider, startBlkNum)
finally:
await close(dataProvider)
proc safeCancel(fut: var Future[void]) =
if not fut.isNil:
fut.cancel()
fut = nil
proc stop*(m: MainchainMonitor) =
safeCancel m.runFut
safeCancel m.genesisMonitoringFut
proc start(m: MainchainMonitor, delayBeforeStart: Duration) =
if m.runFut.isNil:
let runFut = m.run(delayBeforeStart)
@ -635,7 +712,7 @@ proc start(m: MainchainMonitor, delayBeforeStart: Duration) =
if runFut.error[] of CatchableError:
if runFut == m.runFut:
error "Mainchain monitor failure, restarting", err = runFut.error.msg
m.runFut = nil
m.stop()
m.start(5.seconds)
else:
fatal "Fatal exception reached", err = runFut.error.msg
@ -644,11 +721,6 @@ proc start(m: MainchainMonitor, delayBeforeStart: Duration) =
proc start*(m: MainchainMonitor) {.inline.} =
m.start(0.seconds)
proc stop*(m: MainchainMonitor) =
if not m.runFut.isNil:
m.runFut.cancel()
m.runFut = nil
proc getLatestEth1BlockHash*(url: string): Future[Eth2Digest] {.async.} =
let web3 = await newWeb3(url)
try:

View File

@ -83,7 +83,7 @@ proc getMerkleProof*[Depth: static int](tree: SparseMerkleTree[Depth],
else:
result[depth] = zeroHashes[depth]
proc attachMerkleProofs*(deposits: var seq[Deposit]) =
proc attachMerkleProofs*(deposits: var openarray[Deposit]) =
let deposit_data_roots = mapIt(deposits, it.data.hash_tree_root)
var
deposit_data_sums: seq[Eth2Digest]

View File

@ -83,7 +83,9 @@ proc process_deposit*(
if not verify_deposit_signature(deposit.data):
# It's ok that deposits fail - they get included in blocks regardless
# TODO spec test?
debug "Skipping deposit with invalid signature",
# TODO: This is temporary set to trace level in order to deal with the
# large number of invalid deposits on Altona
trace "Skipping deposit with invalid signature",
deposit = shortLog(deposit.data)
return true
@ -192,12 +194,20 @@ proc slash_validator*(state: var BeaconState, slashed_index: ValidatorIndex,
increase_balance(
state, whistleblower_index, whistleblowing_reward - proposer_reward)
func genesis_time_from_eth1_timestamp*(eth1_timestamp: uint64): uint64 =
# TODO: remove once we switch completely to v0.12.1
when SPEC_VERSION == "0.12.1":
eth1_timestamp + GENESIS_DELAY
else:
const SECONDS_PER_DAY = uint64(60*60*24)
eth1_timestamp + 2'u64 * SECONDS_PER_DAY - (eth1_timestamp mod SECONDS_PER_DAY)
# https://github.com/ethereum/eth2.0-specs/blob/v0.12.1/specs/phase0/beacon-chain.md#genesis
proc initialize_beacon_state_from_eth1*(
eth1_block_hash: Eth2Digest,
eth1_timestamp: uint64,
deposits: openArray[Deposit],
flags: UpdateFlags = {}): BeaconStateRef {.nbench.}=
flags: UpdateFlags = {}): BeaconStateRef {.nbench.} =
## Get the genesis ``BeaconState``.
##
## Before the beacon chain starts, validators will register in the Eth1 chain
@ -214,19 +224,12 @@ proc initialize_beacon_state_from_eth1*(
# at that point :)
doAssert deposits.len >= SLOTS_PER_EPOCH
const SECONDS_PER_DAY = uint64(60*60*24)
var state = BeaconStateRef(
fork: Fork(
previous_version: Version(GENESIS_FORK_VERSION),
current_version: Version(GENESIS_FORK_VERSION),
epoch: GENESIS_EPOCH),
genesis_time:
# TODO: remove once we switch completely to v0.12.1
when SPEC_VERSION == "0.12.1":
eth1_timestamp + GENESIS_DELAY
else:
eth1_timestamp + 2'u64 * SECONDS_PER_DAY -
(eth1_timestamp mod SECONDS_PER_DAY),
genesis_time: genesis_time_from_eth1_timestamp(eth1_timestamp),
eth1_data:
Eth1Data(block_hash: eth1_block_hash, deposit_count: uint64(len(deposits))),
latest_block_header:
@ -280,11 +283,11 @@ proc initialize_hashed_beacon_state_from_eth1*(
eth1_block_hash, eth1_timestamp, deposits, flags)
HashedBeaconState(data: genesisState[], root: hash_tree_root(genesisState[]))
func is_valid_genesis_state*(state: BeaconState): bool =
func is_valid_genesis_state*(state: BeaconState, active_validator_indices: seq[ValidatorIndex]): bool =
if state.genesis_time < MIN_GENESIS_TIME:
return false
# This is an okay get_active_validator_indices(...) for the time being.
if len(get_active_validator_indices(state, GENESIS_EPOCH)) < MIN_GENESIS_ACTIVE_VALIDATOR_COUNT:
if len(active_validator_indices) < MIN_GENESIS_ACTIVE_VALIDATOR_COUNT:
return false
return true

2
vendor/nim-web3 vendored

@ -1 +1 @@
Subproject commit 694ff2ad74b36f7f8402235f92134cb5532661d4
Subproject commit a75519fe1264ea861fb65eb2ffec1d6566ebd033