Add polling support in the Eth1Monitor (extracted from the merge branch)

This commit is contained in:
Zahary Karadjov 2021-11-25 18:51:51 +02:00 committed by zah
parent ff50670147
commit ef1de66316
3 changed files with 78 additions and 28 deletions

View File

@ -128,6 +128,12 @@ type
desc: "One or more Web3 provider URLs used for obtaining deposit contract data" desc: "One or more Web3 provider URLs used for obtaining deposit contract data"
name: "web3-url" }: seq[string] name: "web3-url" }: seq[string]
web3ForcePolling* {.
hidden
defaultValue: false
desc: "Force the use of polling when determining the head block of Eth1"
name: "web3-force-polling" }: bool
nonInteractive* {. nonInteractive* {.
desc: "Do not display interative prompts. Quit on missing configuration" desc: "Do not display interative prompts. Quit on missing configuration"
name: "non-interactive" }: bool name: "non-interactive" }: bool

View File

@ -96,11 +96,12 @@ type
web3Urls: seq[string] web3Urls: seq[string]
eth1Network: Option[Eth1Network] eth1Network: Option[Eth1Network]
depositContractAddress*: Eth1Address depositContractAddress*: Eth1Address
forcePolling: bool
dataProvider: Web3DataProviderRef dataProvider: Web3DataProviderRef
latestEth1Block: Option[FullBlockId]
depositsChain: Eth1Chain depositsChain: Eth1Chain
latestEth1BlockNumber: Eth1BlockNumber
eth1Progress: AsyncEvent eth1Progress: AsyncEvent
runFut: Future[void] runFut: Future[void]
@ -120,6 +121,10 @@ type
Web3DataProviderRef* = ref Web3DataProvider Web3DataProviderRef* = ref Web3DataProvider
FullBlockId* = object
number: Eth1BlockNumber
hash: BlockHash
DataProviderFailure* = object of CatchableError DataProviderFailure* = object of CatchableError
CorruptDataProvider* = object of DataProviderFailure CorruptDataProvider* = object of DataProviderFailure
DataProviderTimeout* = object of DataProviderFailure DataProviderTimeout* = object of DataProviderFailure
@ -243,14 +248,16 @@ template finalizedDepositsMerkleizer(m: Eth1Monitor): auto =
m.depositsChain.finalizedDepositsMerkleizer m.depositsChain.finalizedDepositsMerkleizer
proc fixupWeb3Urls*(web3Url: var string) = proc fixupWeb3Urls*(web3Url: var string) =
let normalizedUrl = toLowerAscii(web3Url) var normalizedUrl = toLowerAscii(web3Url)
if not (normalizedUrl.startsWith("https://") or if not (normalizedUrl.startsWith("https://") or
normalizedUrl.startsWith("http://") or normalizedUrl.startsWith("http://") or
normalizedUrl.startsWith("wss://") or normalizedUrl.startsWith("wss://") or
normalizedUrl.startsWith("ws://")): normalizedUrl.startsWith("ws://")):
web3Url = "ws://" & web3Url normalizedUrl = "ws://" & normalizedUrl
warn "The Web3 URL does not specify a protocol. Assuming a WebSocket server", web3Url warn "The Web3 URL does not specify a protocol. Assuming a WebSocket server", web3Url
return
# We do this at the end in order to allow the warning above to print the original value
web3Url = normalizedUrl
template toGaugeValue(x: Quantity): int64 = template toGaugeValue(x: Quantity): int64 =
toGaugeValue(distinctBase x) toGaugeValue(distinctBase x)
@ -796,7 +803,8 @@ proc init*(T: type Eth1Monitor,
db: BeaconChainDB, db: BeaconChainDB,
web3Urls: seq[string], web3Urls: seq[string],
depositContractSnapshot: DepositContractSnapshot, depositContractSnapshot: DepositContractSnapshot,
eth1Network: Option[Eth1Network]): T = eth1Network: Option[Eth1Network],
forcePolling: bool): T =
doAssert web3Urls.len > 0 doAssert web3Urls.len > 0
var web3Urls = web3Urls var web3Urls = web3Urls
@ -810,7 +818,8 @@ proc init*(T: type Eth1Monitor,
depositContractAddress: cfg.DEPOSIT_CONTRACT_ADDRESS, depositContractAddress: cfg.DEPOSIT_CONTRACT_ADDRESS,
web3Urls: web3Urls, web3Urls: web3Urls,
eth1Network: eth1Network, eth1Network: eth1Network,
eth1Progress: newAsyncEvent()) eth1Progress: newAsyncEvent(),
forcePolling: forcePolling)
proc safeCancel(fut: var Future[void]) = proc safeCancel(fut: var Future[void]) =
if not fut.isNil and not fut.finished: if not fut.isNil and not fut.finished:
@ -826,7 +835,7 @@ proc resetState(m: Eth1Monitor) {.async.} =
safeCancel m.runFut safeCancel m.runFut
m.depositsChain.clear() m.depositsChain.clear()
m.latestEth1BlockNumber = 0 m.latestEth1Block = none(FullBlockId)
if m.dataProvider != nil: if m.dataProvider != nil:
await m.dataProvider.close() await m.dataProvider.close()
@ -844,11 +853,15 @@ proc stop*(m: Eth1Monitor) {.async.} =
const const
votedBlocksSafetyMargin = 50 votedBlocksSafetyMargin = 50
func latestEth1BlockNumber(m: Eth1Monitor): Eth1BlockNumber =
if m.latestEth1Block.isSome:
Eth1BlockNumber m.latestEth1Block.get.number
else:
Eth1BlockNumber 0
func earliestBlockOfInterest(m: Eth1Monitor): Eth1BlockNumber = func earliestBlockOfInterest(m: Eth1Monitor): Eth1BlockNumber =
m.latestEth1BlockNumber - (2 * m.cfg.ETH1_FOLLOW_DISTANCE) - votedBlocksSafetyMargin m.latestEth1BlockNumber - (2 * m.cfg.ETH1_FOLLOW_DISTANCE) - votedBlocksSafetyMargin
proc syncBlockRange(m: Eth1Monitor, proc syncBlockRange(m: Eth1Monitor,
merkleizer: ref DepositsMerkleizer, merkleizer: ref DepositsMerkleizer,
fromBlock, toBlock, fromBlock, toBlock,
@ -1017,6 +1030,9 @@ proc syncBlockRange(m: Eth1Monitor,
m.signalGenesis m.createGenesisState(genesisBlock) m.signalGenesis m.createGenesisState(genesisBlock)
func init(T: type FullBlockId, blk: Eth1BlockHeader|BlockObject): T =
FullBlockId(number: Eth1BlockNumber blk.number, hash: blk.hash)
proc startEth1Syncing(m: Eth1Monitor, delayBeforeStart: Duration) {.async.} = proc startEth1Syncing(m: Eth1Monitor, delayBeforeStart: Duration) {.async.} =
if m.state == Failed: if m.state == Failed:
await m.resetState() await m.resetState()
@ -1052,19 +1068,30 @@ proc startEth1Syncing(m: Eth1Monitor, delayBeforeStart: Duration) {.async.} =
quit 1 quit 1
m.state = Started m.state = Started
var mustUsePolling = m.forcePolling or
web3Url.startsWith("http://") or
web3Url.startsWith("https://")
await m.dataProvider.onBlockHeaders do (blk: Eth1BlockHeader) if not mustUsePolling:
{.raises: [Defect], gcsafe.}: proc newBlockHeadersHandler(blk: Eth1BlockHeader)
try: {.raises: [Defect], gcsafe.} =
if blk.number.uint64 > m.latestEth1BlockNumber: try:
eth1_latest_head.set blk.number.toGaugeValue if blk.number.uint64 > m.latestEth1BlockNumber:
m.latestEth1BlockNumber = Eth1BlockNumber blk.number eth1_latest_head.set blk.number.toGaugeValue
m.eth1Progress.fire() m.latestEth1Block = some FullBlockId.init(blk)
except Exception: m.eth1Progress.fire()
# TODO Investigate why this exception is being raised except Exception:
raiseAssert "AsyncEvent.fire should not raise exceptions" # TODO Investigate why this exception is being raised
do (err: CatchableError): raiseAssert "AsyncEvent.fire should not raise exceptions"
debug "Error while processing Eth1 block headers subscription", err = err.msg
proc subscriptionErrorHandler(err: CatchableError)
{.raises: [Defect], gcsafe.} =
warn "Failed to subscribe for block headers. Switching to polling",
web3Url, err = err.msg
mustUsePolling = true
await m.dataProvider.onBlockHeaders(newBlockHeadersHandler,
subscriptionErrorHandler)
let startBlock = awaitWithRetries( let startBlock = awaitWithRetries(
m.dataProvider.getBlockByHash(m.depositsChain.finalizedBlockHash.asBlockHash)) m.dataProvider.getBlockByHash(m.depositsChain.finalizedBlockHash.asBlockHash))
@ -1099,10 +1126,23 @@ proc startEth1Syncing(m: Eth1Monitor, delayBeforeStart: Duration) {.async.} =
if m.depositsChain.hasConsensusViolation: if m.depositsChain.hasConsensusViolation:
raise newException(CorruptDataProvider, "Eth1 chain contradicts Eth2 consensus") raise newException(CorruptDataProvider, "Eth1 chain contradicts Eth2 consensus")
awaitWithTimeout(m.eth1Progress.wait(), 5.minutes): if mustUsePolling:
raise newException(CorruptDataProvider, "No eth1 chain progress for too long") let blk = awaitWithRetries(
m.dataProvider.web3.provider.eth_getBlockByNumber(blockId("latest"), false))
m.eth1Progress.clear() let fullBlockId = FullBlockId.init(blk)
if m.latestEth1Block.isSome and
m.latestEth1Block.get == fullBlockId:
await sleepAsync(m.cfg.SECONDS_PER_ETH1_BLOCK.int.seconds)
continue
m.latestEth1Block = some fullBlockId
else:
awaitWithTimeout(m.eth1Progress.wait(), 5.minutes):
raise newException(CorruptDataProvider, "No eth1 chain progress for too long")
m.eth1Progress.clear()
if m.latestEth1BlockNumber <= m.cfg.ETH1_FOLLOW_DISTANCE: if m.latestEth1BlockNumber <= m.cfg.ETH1_FOLLOW_DISTANCE:
continue continue
@ -1182,7 +1222,8 @@ when hasGenesisDetection:
web3Urls: seq[string], web3Urls: seq[string],
depositContractAddress: Eth1Address, depositContractAddress: Eth1Address,
depositContractDeployedAt: BlockHashOrNumber, depositContractDeployedAt: BlockHashOrNumber,
eth1Network: Option[Eth1Network]): Future[Result[T, string]] {.async.} = eth1Network: Option[Eth1Network],
forcePolling: bool): Future[Result[T, string]] {.async.} =
doAssert web3Urls.len > 0 doAssert web3Urls.len > 0
try: try:
var urlIdx = 0 var urlIdx = 0
@ -1226,7 +1267,8 @@ when hasGenesisDetection:
web3Urls, web3Urls,
depositContractAddress, depositContractAddress,
depositContractSnapshot, depositContractSnapshot,
eth1Network) eth1Network,
forcePolling)
for i in 0 ..< db.genesisDeposits.len: for i in 0 ..< db.genesisDeposits.len:
monitor.produceDerivedData db.genesisDeposits.get(i) monitor.produceDerivedData db.genesisDeposits.get(i)

View File

@ -224,7 +224,8 @@ proc init*(T: type BeaconNode,
db, db,
config.web3Urls, config.web3Urls,
depositContractDeployedAt, depositContractDeployedAt,
eth1Network) eth1Network,
config.web3ForcePolling)
if eth1MonitorRes.isErr: if eth1MonitorRes.isErr:
fatal "Failed to start Eth1 monitor", fatal "Failed to start Eth1 monitor",
@ -342,7 +343,8 @@ proc init*(T: type BeaconNode,
db, db,
config.web3Urls, config.web3Urls,
genesisDepositsSnapshot, genesisDepositsSnapshot,
eth1Network) eth1Network,
config.web3ForcePolling)
let rpcServer = if config.rpcEnabled: let rpcServer = if config.rpcEnabled:
RpcServer.init(config.rpcAddress, config.rpcPort) RpcServer.init(config.rpcAddress, config.rpcPort)