import EL deposits even when EL is stuck (#3956)
* import EL deposits even when EL is stuck The `eth1_monitor` only starts importing deposits once the EL reports a new head block. However, the EL may be stuck at a block, e.g., the TTD. By polling the latest EL block once after subscribing to new EL block events it is ensured that deposits are still imported in this situation. * also poll once on re-connects * update `eth1_latest_head` metric in poll mode * add comment about similar polling vs events parts * replace check with assert * `isNewLastBlock` helper
This commit is contained in:
parent
b1974d90eb
commit
052f9edfd4
|
@ -1314,6 +1314,9 @@ proc syncBlockRange(m: Eth1Monitor,
|
||||||
func init(T: type FullBlockId, blk: Eth1BlockHeader|BlockObject): T =
|
func init(T: type FullBlockId, blk: Eth1BlockHeader|BlockObject): T =
|
||||||
FullBlockId(number: Eth1BlockNumber blk.number, hash: blk.hash)
|
FullBlockId(number: Eth1BlockNumber blk.number, hash: blk.hash)
|
||||||
|
|
||||||
|
func isNewLastBlock(m: Eth1Monitor, blk: Eth1BlockHeader|BlockObject): bool =
|
||||||
|
blk.number.uint64 > m.latestEth1BlockNumber
|
||||||
|
|
||||||
proc startEth1Syncing(m: Eth1Monitor, delayBeforeStart: Duration) {.async.} =
|
proc startEth1Syncing(m: Eth1Monitor, delayBeforeStart: Duration) {.async.} =
|
||||||
if m.state == Started:
|
if m.state == Started:
|
||||||
return
|
return
|
||||||
|
@ -1401,7 +1404,7 @@ proc startEth1Syncing(m: Eth1Monitor, delayBeforeStart: Duration) {.async.} =
|
||||||
proc newBlockHeadersHandler(blk: Eth1BlockHeader)
|
proc newBlockHeadersHandler(blk: Eth1BlockHeader)
|
||||||
{.raises: [Defect], gcsafe.} =
|
{.raises: [Defect], gcsafe.} =
|
||||||
try:
|
try:
|
||||||
if blk.number.uint64 > m.latestEth1BlockNumber:
|
if m.isNewLastBlock(blk):
|
||||||
eth1_latest_head.set blk.number.toGaugeValue
|
eth1_latest_head.set blk.number.toGaugeValue
|
||||||
m.latestEth1Block = some FullBlockId.init(blk)
|
m.latestEth1Block = some FullBlockId.init(blk)
|
||||||
m.eth1Progress.fire()
|
m.eth1Progress.fire()
|
||||||
|
@ -1440,6 +1443,7 @@ proc startEth1Syncing(m: Eth1Monitor, delayBeforeStart: Duration) {.async.} =
|
||||||
|
|
||||||
debug "Starting Eth1 syncing", `from` = shortLog(m.depositsChain.blocks[^1])
|
debug "Starting Eth1 syncing", `from` = shortLog(m.depositsChain.blocks[^1])
|
||||||
|
|
||||||
|
var didPollOnce = false
|
||||||
while true:
|
while true:
|
||||||
if bnStatus == BeaconNodeStatus.Stopping:
|
if bnStatus == BeaconNodeStatus.Stopping:
|
||||||
when hasGenesisDetection:
|
when hasGenesisDetection:
|
||||||
|
@ -1457,18 +1461,21 @@ proc startEth1Syncing(m: Eth1Monitor, delayBeforeStart: Duration) {.async.} =
|
||||||
m.startIdx = 0
|
m.startIdx = 0
|
||||||
return
|
return
|
||||||
|
|
||||||
let nextBlock = if mustUsePolling:
|
let nextBlock = if mustUsePolling or not didPollOnce:
|
||||||
let blk = awaitWithRetries(
|
let blk = awaitWithRetries(
|
||||||
m.dataProvider.web3.provider.eth_getBlockByNumber(blockId("latest"), false))
|
m.dataProvider.web3.provider.eth_getBlockByNumber(blockId("latest"), false))
|
||||||
|
|
||||||
let fullBlockId = FullBlockId.init(blk)
|
# Same as when handling events, minus `m.eth1Progress` round trip
|
||||||
|
if m.isNewLastBlock(blk):
|
||||||
if m.latestEth1Block.isSome and
|
eth1_latest_head.set blk.number.toGaugeValue
|
||||||
m.latestEth1Block.get == fullBlockId:
|
m.latestEth1Block = some FullBlockId.init(blk)
|
||||||
|
elif mustUsePolling:
|
||||||
await sleepAsync(m.cfg.SECONDS_PER_ETH1_BLOCK.int.seconds)
|
await sleepAsync(m.cfg.SECONDS_PER_ETH1_BLOCK.int.seconds)
|
||||||
continue
|
continue
|
||||||
|
else:
|
||||||
|
doAssert not didPollOnce
|
||||||
|
|
||||||
m.latestEth1Block = some fullBlockId
|
didPollOnce = true
|
||||||
blk
|
blk
|
||||||
else:
|
else:
|
||||||
awaitWithTimeout(m.eth1Progress.wait(), 5.minutes):
|
awaitWithTimeout(m.eth1Progress.wait(), 5.minutes):
|
||||||
|
@ -1476,12 +1483,7 @@ proc startEth1Syncing(m: Eth1Monitor, delayBeforeStart: Duration) {.async.} =
|
||||||
|
|
||||||
m.eth1Progress.clear()
|
m.eth1Progress.clear()
|
||||||
|
|
||||||
if m.latestEth1Block.isNone:
|
doAssert m.latestEth1Block.isSome
|
||||||
# It should not be possible for `latestEth1Block` to be none here.
|
|
||||||
# Firing the `eth1Progress` event is always done after assinging
|
|
||||||
# a value for it.
|
|
||||||
continue
|
|
||||||
|
|
||||||
awaitWithRetries m.dataProvider.getBlockByHash(m.latestEth1Block.get.hash)
|
awaitWithRetries m.dataProvider.getBlockByHash(m.latestEth1Block.get.hash)
|
||||||
|
|
||||||
if m.currentEpoch >= m.cfg.BELLATRIX_FORK_EPOCH and m.terminalBlockHash.isNone:
|
if m.currentEpoch >= m.cfg.BELLATRIX_FORK_EPOCH and m.terminalBlockHash.isNone:
|
||||||
|
|
Loading…
Reference in New Issue