Simplify the mainchain monitor

This commit is contained in:
Zahary Karadjov 2020-10-14 17:04:08 +03:00 committed by zah
parent ce1fda1195
commit 2152dc6136
4 changed files with 117 additions and 241 deletions

View File

@ -132,6 +132,7 @@ proc createSeq*(db: SqStoreRef, baseDir, name: string, T: type): DbSeq[T] =
proc add*[T](s: var DbSeq[T], val: T) =
var bytes = SSZ.encode(val)
s.insertStmt.exec(bytes).expect "working database"
inc s.recordCount
template len*[T](s: DbSeq[T]): uint64 =
s.recordCount.uint64
@ -274,7 +275,7 @@ proc putTailBlock*(db: BeaconChainDB, key: Eth2Digest) =
proc putGenesisBlockRoot*(db: BeaconChainDB, key: Eth2Digest) =
db.put(subkey(kGenesisBlockRoot), key)
proc putEth1PersistedTo*(db: BeaconChainDB, key: Eth2Digest) =
proc putEth1PersistedTo*(db: BeaconChainDB, key: Eth1Data) =
db.put(subkey(kEth1PersistedTo), key)
proc getBlock*(db: BeaconChainDB, key: Eth2Digest): Opt[TrustedSignedBeaconBlock] =
@ -323,9 +324,10 @@ proc getGenesisBlockRoot*(db: BeaconChainDB): Eth2Digest =
db.get(subkey(kGenesisBlockRoot), Eth2Digest).expect(
"The database must be seeded with the genesis state")
proc getEth1PersistedTo*(db: BeaconChainDB): Eth2Digest =
db.get(subkey(kGenesisBlockRoot), Eth2Digest).expect(
"The database must be seeded with genesis eth1 data")
proc getEth1PersistedTo*(db: BeaconChainDB): Opt[Eth1Data] =
result.ok(Eth1Data())
if db.get(subkey(kEth1PersistedTo), result.get) != GetResult.found:
result.err()
proc containsBlock*(db: BeaconChainDB, key: Eth2Digest): bool =
db.backend.contains(subkey(SignedBeaconBlock, key)).expect("working database")

View File

@ -69,6 +69,27 @@ func enrForkIdFromState(state: BeaconState): ENRForkID =
next_fork_version: forkVer,
next_fork_epoch: FAR_FUTURE_EPOCH)
proc startMainchainMonitor(db: BeaconChainDB,
conf: BeaconNodeConf): Future[MainchainMonitor] {.async.} =
let mainchainMonitorRes = await MainchainMonitor.init(
db,
conf.runtimePreset,
conf.web3Url,
conf.depositContractAddress.get,
conf.depositContractDeployedAt.get)
result = if mainchainMonitorRes.isOk:
mainchainMonitorRes.get
else:
fatal "Failed to start Eth1 monitor",
reason = mainchainMonitorRes.error,
web3Url = conf.web3Url,
depositContractAddress = conf.depositContractAddress.get,
depositContractDeployedAt = conf.depositContractDeployedAt.get
quit 1
result.start()
proc init*(T: type BeaconNode,
rng: ref BrHmacDrbgContext,
conf: BeaconNodeConf,
@ -139,30 +160,9 @@ proc init*(T: type BeaconNode,
fatal "Deposit contract deployment block not specified"
quit 1
let web3 = web3Provider(conf.web3Url)
let deployedAtAsHash =
if conf.depositContractDeployedAt.get.startsWith "0x":
try: BlockHash.fromHex conf.depositContractDeployedAt.get
except ValueError:
fatal "Invalid hex value specified for deposit-contract-block"
quit 1
else:
let blockNum = try: parseBiggestUInt conf.depositContractDeployedAt.get
except ValueError:
fatal "Invalid nummeric value for deposit-contract-block"
quit 1
await getEth1BlockHash(conf.web3Url, blockId blockNum)
# TODO Could move this to a separate "GenesisMonitor" process or task
# that would do only this - see Paul's proposal for this.
mainchainMonitor = MainchainMonitor.init(
db,
conf.runtimePreset,
web3,
conf.depositContractAddress.get,
Eth1Data(block_hash: deployedAtAsHash.asEth2Digest, deposit_count: 0))
mainchainMonitor.start()
mainchainMonitor = await startMainchainMonitor(db, conf)
genesisState = await mainchainMonitor.waitGenesis()
if bnStatus == BeaconNodeStatus.Stopping:
@ -232,16 +232,11 @@ proc init*(T: type BeaconNode,
if mainchainMonitor.isNil and
conf.web3Url.len > 0 and
conf.depositContractAddress.isSome:
mainchainMonitor = MainchainMonitor.init(
db,
conf.runtimePreset,
web3Provider(conf.web3Url),
conf.depositContractAddress.get,
chainDag.headState.data.data.eth1_data)
# TODO if we don't have any validators attached, we don't need a mainchain
# monitor
mainchainMonitor.start()
conf.depositContractAddress.isSome and
conf.depositContractDeployedAt.isSome:
# TODO if we don't have any validators attached,
# we don't need a mainchain monitor
mainchainMonitor = await startMainchainMonitor(db, conf)
let rpcServer = if conf.rpcEnabled:
RpcServer.init(conf.rpcAddress, conf.rpcPort)

View File

@ -1,5 +1,5 @@
import
std/[deques, tables, hashes, options, strformat],
std/[deques, tables, hashes, options, strformat, strutils],
chronos, web3, web3/ethtypes as web3Types, json, chronicles,
eth/common/eth_types, eth/async_utils,
spec/[datatypes, digest, crypto, beaconstate, helpers, signatures],
@ -51,31 +51,19 @@ type
MainchainMonitor* = ref object
db: BeaconChainDB
preset: RuntimePreset
depositContractAddress: Address
dataProviderFactory*: DataProviderFactory
dataProvider: Web3DataProviderRef
depositQueue: AsyncQueue[Eth1BlockHeader]
eth1Chain: Eth1Chain
genesisState: NilableBeaconStateRef
genesisStateFut: Future[void]
genesisMonitoringFut: Future[void]
eth1Chain: Eth1Chain
depositQueue: AsyncQueue[Eth1BlockHeader]
runFut: Future[void]
DataProvider* = object of RootObj
DataProviderRef* = ref DataProvider
DataProviderFactory* = object
desc: string
new: proc(depositContractAddress: Address): Future[DataProviderRef] {.
gcsafe
# raises: [Defect]
.}
Web3DataProvider* = object of DataProvider
Web3DataProvider* = object
url: string
web3: Web3
ns: Sender[DepositContract]
@ -97,15 +85,17 @@ type
const
web3Timeouts = 5.seconds
template depositContractAddress(m: MainchainMonitor): Eth1Address =
m.dataProvider.ns.contractAddress
template web3Url(m: MainchainMonitor): string =
m.dataProvider.url
# TODO: Add preset validation
# MIN_GENESIS_ACTIVE_VALIDATOR_COUNT should be larger than SLOTS_PER_EPOCH
# doAssert SECONDS_PER_ETH1_BLOCK * preset.ETH1_FOLLOW_DISTANCE < GENESIS_DELAY,
# "Invalid configuration: GENESIS_DELAY is set too low"
# TODO Nim's analysis on the lock level of the methods in this
# module seems broken. Investigate and file this as an issue.
{.push warning[LockLevel]: off.}
# https://github.com/ethereum/eth2.0-specs/blob/v1.0.0-rc.0/specs/phase0/validator.md#get_eth1_data
func compute_time_at_slot(state: BeaconState, slot: Slot): uint64 =
state.genesis_time + slot * SECONDS_PER_SLOT
@ -209,102 +199,19 @@ func clear*(eth1Chain: var Eth1Chain) =
template hash*(x: Eth1Block): Hash =
hash(x.voteData.block_hash.data)
template notImplemented =
doAssert false, "Method not implemented"
method getBlockByHash*(p: DataProviderRef, hash: BlockHash): Future[BlockObject] {.
base
gcsafe
locks: 0
# raises: [Defect]
.} =
notImplemented
method getBlockByNumber*(p: DataProviderRef, hash: Eth1BlockNumber): Future[BlockObject] {.
base
gcsafe
locks: 0
# raises: [Defect]
.} =
notImplemented
method onDisconnect*(p: DataProviderRef, handler: DisconnectHandler) {.
base
gcsafe
locks: 0
# raises: []
.} =
notImplemented
method onBlockHeaders*(p: DataProviderRef,
blockHeaderHandler: BlockHeaderHandler,
errorHandler: SubscriptionErrorHandler): Future[void] {.
base
gcsafe
locks: 0
# raises: []
.} =
notImplemented
method close*(p: DataProviderRef): Future[void] {.
base
gcsafe
locks: 0
# raises: [Defect]
.} =
notImplemented
method fetchDepositData*(p: DataProviderRef,
fromBlock, toBlock: Eth1BlockNumber): Future[seq[Eth1Block]] {.
base
gcsafe
locks: 0
# raises: [Defect, CatchableError]
.} =
notImplemented
method fetchBlockDetails(p: DataProviderRef, blk: Eth1Block): Future[void] {.
base
gcsafe
locks: 0
# raises: [Defect, CatchableError]
.} =
notImplemented
proc new*(T: type Web3DataProvider,
web3Url: string,
depositContractAddress: Address): Future[ref Web3DataProvider] {.
async
# raises: [Defect]
.} =
try:
type R = ref T
let
web3 = await newWeb3(web3Url)
ns = web3.contractSender(DepositContract, depositContractAddress)
return R(url: web3Url, web3: web3, ns: ns)
except CatchableError:
return nil
func web3Provider*(web3Url: string): DataProviderFactory =
proc factory(depositContractAddress: Address): Future[DataProviderRef] {.async.} =
result = await Web3DataProvider.new(web3Url, depositContractAddress)
DataProviderFactory(desc: "web3(" & web3Url & ")", new: factory)
method close*(p: Web3DataProviderRef): Future[void] {.async, locks: 0.} =
proc close*(p: Web3DataProviderRef): Future[void] {.async, locks: 0.} =
if p.blockHeadersSubscription != nil:
await p.blockHeadersSubscription.unsubscribe()
await p.web3.close()
method getBlockByHash*(p: Web3DataProviderRef, hash: BlockHash): Future[BlockObject] =
proc getBlockByHash*(p: Web3DataProviderRef, hash: BlockHash): Future[BlockObject] =
return p.web3.provider.eth_getBlockByHash(hash, false)
method getBlockByNumber*(p: Web3DataProviderRef, number: Eth1BlockNumber): Future[BlockObject] =
proc getBlockByNumber*(p: Web3DataProviderRef, number: Eth1BlockNumber): Future[BlockObject] =
return p.web3.provider.eth_getBlockByNumber(&"0x{number:X}", false)
proc getBlockNumber(p: DataProviderRef, hash: BlockHash): Future[Eth1BlockNumber] {.async.} =
proc getBlockNumber(p: Web3DataProviderRef, hash: BlockHash): Future[Eth1BlockNumber] {.async.} =
try:
let blk = awaitWithTimeout(p.getBlockByHash(hash), web3Timeouts):
return 0
@ -362,15 +269,15 @@ proc readJsonDeposits(depositsList: JsonNode): seq[Eth1Block] =
amount: bytes_to_uint64(array[8, byte](amount)),
signature: ValidatorSig.init(array[96, byte](signature))))
method fetchDepositData*(p: Web3DataProviderRef,
fromBlock, toBlock: Eth1BlockNumber): Future[seq[Eth1Block]]
{.async, locks: 0.} =
proc fetchDepositData*(p: Web3DataProviderRef,
fromBlock, toBlock: Eth1BlockNumber): Future[seq[Eth1Block]]
{.async, locks: 0.} =
info "Obtaining deposit log events", fromBlock, toBlock
return readJsonDeposits(await p.ns.getJsonLogs(DepositEvent,
fromBlock = some blockId(fromBlock),
toBlock = some blockId(toBlock)))
method fetchBlockDetails(p: Web3DataProviderRef, blk: Eth1Block) {.async.} =
proc fetchBlockDetails(p: Web3DataProviderRef, blk: Eth1Block) {.async.} =
let
web3Block = p.getBlockByNumber(blk.number)
depositRoot = p.ns.get_deposit_root.call(blockNumber = blk.number)
@ -386,16 +293,16 @@ method fetchBlockDetails(p: Web3DataProviderRef, blk: Eth1Block) {.async.} =
blk.voteData.deposit_count = depositCount
blk.voteData.deposit_root = depositRoot.read.asEth2Digest
method onDisconnect*(p: Web3DataProviderRef, handler: DisconnectHandler) {.
proc onDisconnect*(p: Web3DataProviderRef, handler: DisconnectHandler) {.
gcsafe
locks: 0
# raises: []
.} =
p.web3.onDisconnect = handler
method onBlockHeaders*(p: Web3DataProviderRef,
blockHeaderHandler: BlockHeaderHandler,
errorHandler: SubscriptionErrorHandler): Future[void] {.
proc onBlockHeaders*(p: Web3DataProviderRef,
blockHeaderHandler: BlockHeaderHandler,
errorHandler: SubscriptionErrorHandler): Future[void] {.
async
gcsafe
locks: 0
@ -456,30 +363,53 @@ proc getBlockProposalData*(m: MainchainMonitor,
proc init*(T: type MainchainMonitor,
db: BeaconChainDB,
preset: RuntimePreset,
dataProviderFactory: DataProviderFactory,
web3Url: string,
depositContractAddress: Eth1Address,
startPosition: Eth1Data): T =
T(db: db,
depositContractDeployedAt: string): Future[Result[T, string]] {.async.} =
let web3 = try: await newWeb3(web3Url)
except CatchableError as err:
return err "Failed to setup web3 connection"
let
ns = web3.contractSender(DepositContract, depositContractAddress)
dataProvider = Web3DataProviderRef(url: web3Url, web3: web3, ns: ns)
let
previouslyPersistedTo = db.getEth1PersistedTo()
knownStart = previouslyPersistedTo.get:
# `previouslyPersistedTo` wall null, we start from scratch
let deployedAtHash = if depositContractDeployedAt.startsWith "0x":
try: BlockHash.fromHex depositContractDeployedAt
except ValueError:
return err "Invalid hex value specified for deposit-contract-block"
else:
let blockNum = try: parseBiggestUInt depositContractDeployedAt
except ValueError:
return err "Invalid nummeric value for deposit-contract-block"
try:
let blk = await dataProvider.getBlockByNumber(blockNum)
blk.hash
except CatchableError:
return err("Failed to obtain block hash for block number " & $blockNum)
Eth1Data(block_hash: deployedAtHash.asEth2Digest, deposit_count: 0)
return ok T(
db: db,
preset: preset,
dataProvider: dataProvider,
depositQueue: newAsyncQueue[Eth1BlockHeader](),
dataProviderFactory: dataProviderFactory,
depositContractAddress: Address depositContractAddress,
eth1Chain: Eth1Chain(knownStart: startPosition))
eth1Chain: Eth1Chain(knownStart: knownStart))
proc persistFinalizedBlocks(m: MainchainMonitor, timeNow: float): tuple[
genesisBlock: Eth1Block,
previousBlock: Eth1Block
] =
if m.eth1Chain.blocks.len == 0:
return
let followDistanceInSeconds = uint64(SECONDS_PER_ETH1_BLOCK) *
m.preset.ETH1_FOLLOW_DISTANCE
var prevBlock: Eth1Block
# TODO: The DB operations should be executed as a transaction here
block: # TODO Begin Transaction
while true:
while m.eth1Chain.blocks.len > 0:
let blk = m.eth1Chain.blocks.peekFirst
if float(blk.timestamp + followDistanceInSeconds) > timeNow:
break
@ -496,6 +426,8 @@ proc persistFinalizedBlocks(m: MainchainMonitor, timeNow: float): tuple[
withdrawal_credentials: deposit.data.withdrawal_credentials)
m.db.validatorsByKey.insert(pubkey, ValidatorIndex idx)
# TODO The len property is currently stored in memory which
# makes it unsafe in the face of failed transactions
blk.knownValidatorsCount = some m.db.validators.len
discard m.eth1Chain.blocks.popFirst()
@ -511,7 +443,11 @@ proc persistFinalizedBlocks(m: MainchainMonitor, timeNow: float): tuple[
if prevBlock != nil:
# TODO commit transaction
m.db.putEth1PersistedTo prevBlock.voteData.block_hash
m.db.putEth1PersistedTo prevBlock.voteData
m.eth1Chain.knownStart = prevBlock.voteData
notice "Eth1 sync progress",
blockNumber = prevBlock.number,
depositsProcessed = prevBlock.voteData.deposit_count
# TODO Commit
@ -540,12 +476,6 @@ proc signalGenesis(m: MainchainMonitor, genesisState: BeaconStateRef) =
proc findGenesisBlockInRange(m: MainchainMonitor,
startBlock, endBlock: Eth1Block): Future[Eth1Block]
{.async.} =
let dataProvider = await m.dataProviderFactory.new(m.depositContractAddress)
if dataProvider == nil:
error "Failed to initialize Eth1 data provider",
provider = m.dataProviderFactory.desc
raise newException(CatchableError, "Failed to initialize Eth1 data provider")
var
startBlock = startBlock
endBlock = endBlock
@ -559,7 +489,7 @@ proc findGenesisBlockInRange(m: MainchainMonitor,
float(endBlock.number - startBlock.number)
blocksToJump = max(float(MIN_GENESIS_TIME - startBlockTime) / secondsPerBlock, 1.0)
candidateNumber = min(endBlock.number - 1, startBlock.number + 1) # blocksToJump.uint64)
candidateBlock = await dataProvider.getBlockByNumber(candidateNumber)
candidateBlock = await m.dataProvider.getBlockByNumber(candidateNumber)
var candidateAsEth1Block = Eth1Block(number: candidateBlock.number.uint64,
timestamp: candidateBlock.timestamp.uint64,
@ -686,7 +616,7 @@ func maxValidDeposits(eth1Chain: Eth1Chain): uint64 =
0
proc processDeposits(m: MainchainMonitor,
dataProvider: DataProviderRef) {.async.} =
dataProvider: Web3DataProviderRef) {.async.} =
# ATTENTION!
# Please note that this code is using a queue to guarantee the
# strict serial order of processing of deposits. If we had the
@ -696,6 +626,9 @@ proc processDeposits(m: MainchainMonitor,
while true:
m.checkIfShouldStopMainchainMonitor()
let now = epochTime()
discard m.persistFinalizedBlocks(now)
let blk = await m.depositQueue.popFirst()
m.eth1Chain.trimHeight(Eth1BlockNumber(blk.number) - 1)
@ -728,8 +661,8 @@ proc processDeposits(m: MainchainMonitor,
voteData: latestEth1Data)
else:
template logBlockProcessed(blk) =
info "Eth1 block processed",
`block` = shortLog(blk), totalDeposits = blk.voteData.deposit_count
debug "Eth1 block processed",
`block` = shortLog(blk), totalDeposits = blk.voteData.deposit_count
await dataProvider.fetchBlockDetails(eth1Blocks[0])
if m.eth1Chain.addSuccessorBlock(eth1Blocks[0]):
@ -767,33 +700,22 @@ proc run(m: MainchainMonitor, delayBeforeStart: Duration) {.async.} =
if delayBeforeStart != ZeroDuration:
await sleepAsync(delayBeforeStart)
let dataProvider = await m.dataProviderFactory.new(m.depositContractAddress)
if dataProvider == nil:
error "Failed to initialize Eth1 data provider",
provider = m.dataProviderFactory.desc
raise newException(CatchableError, "Failed to initialize Eth1 data provider")
info "Starting Eth1 deposit contract monitoring",
contract = $m.depositContractAddress, url = m.web3Url
try:
info "Starting Eth1 deposit contract monitoring",
contract = $m.depositContractAddress,
url = m.dataProviderFactory.desc
await dataProvider.onBlockHeaders do (blk: Eth1BlockHeader)
await m.dataProvider.onBlockHeaders do (blk: Eth1BlockHeader)
{.raises: [Defect], gcsafe.}:
try:
m.depositQueue.addLastNoWait(blk)
except AsyncQueueFullError:
raiseAssert "The depositQueue has no size limit"
except Exception:
# TODO Investigate why this exception is being raised
raiseAssert "queue.addLastNoWait should not raise exceptions"
do (err: CatchableError):
debug "Error while processing Eth1 block headers subscription", err = err.msg
try:
m.depositQueue.addLastNoWait(blk)
except AsyncQueueFullError:
raiseAssert "The depositQueue has no size limit"
except Exception:
# TODO Investigate why this exception is being raised
raiseAssert "queue.addLastNoWait should not raise exceptions"
do (err: CatchableError):
debug "Error while processing Eth1 block headers subscription", err = err.msg
await m.processDeposits(dataProvider)
finally:
await close(dataProvider)
await m.processDeposits(m.dataProvider)
proc start(m: MainchainMonitor, delayBeforeStart: Duration) =
if m.runFut.isNil:
@ -821,4 +743,3 @@ proc getEth1BlockHash*(url: string, blockId: RtBlockIdentifier): Future[BlockHas
finally:
await web3.close()
{.pop.}

View File

@ -5,48 +5,6 @@ import
chronos, web3/ethtypes,
../beacon_chain/mainchain_monitor
type
MockDataProvider = ref object of DataProvider
method getBlockByHash*(p: MockDataProvider, hash: BlockHash): Future[BlockObject] {.
async
gcsafe
# raises: [Defect]
.} =
return BlockObject()
method onDisconnect*(p: MockDataProvider, handler: DisconnectHandler) {.
async
gcsafe
# raises: []
.} =
discard
method onDepositEvent*(p: MockDataProvider,
startBlock: Eth1BlockNumber,
handler: DepositEventHandler): Future[void] {.
async
gcsafe
# raises: []
.} =
discard
method close*(p: MockDataProvider): Future[void] {.
async
gcsafe
# raises: [Defect]
.} =
discard
method fetchDepositData*(p: MockDataProvider,
web3Block: BlockObject): Future[Eth1Block] {.
async
gcsafe
# raises: [Defect, CatchableError]
.} =
return Eth1Block()
suite "Eth1 Chain":
discard