Introduce some short-cuts in the Eth1 monitoring for faster syncing
This commit is contained in:
parent
12ee61421f
commit
f20f077827
|
@ -182,9 +182,17 @@ func purgeChain*(eth1Chain: var Eth1Chain, blockHash: BlockHash) =
|
||||||
template purgeDescendants*(eth1CHain: Eth1Chain, blk: Eth1Block) =
|
template purgeDescendants*(eth1CHain: Eth1Chain, blk: Eth1Block) =
|
||||||
trimHeight(eth1Chain, blk.number)
|
trimHeight(eth1Chain, blk.number)
|
||||||
|
|
||||||
func addBlock*(eth1Chain: var Eth1Chain, newBlock: Eth1Block) =
|
func isSuccessorBlock(eth1Chain: Eth1Chain, newBlock: Eth1Block): bool =
|
||||||
if eth1Chain.blocks.len > 0:
|
if eth1Chain.blocks.len == 0:
|
||||||
doAssert eth1Chain.blocks.peekLast.number + 1 == newBlock.number
|
return newBlock.deposits.len.uint64 == newBlock.voteData.deposit_count
|
||||||
|
|
||||||
|
let lastBlock = eth1Chain.blocks.peekLast
|
||||||
|
lastBlock.number < newBlock.number and
|
||||||
|
(lastBlock.voteData.deposit_count + newBlock.deposits.len.uint64) == newBlock.voteData.deposit_count
|
||||||
|
|
||||||
|
func addSuccessorBlock*(eth1Chain: var Eth1Chain, newBlock: Eth1Block): bool =
|
||||||
|
result = isSuccessorBlock(eth1Chain, newBlock)
|
||||||
|
if result:
|
||||||
eth1Chain.blocks.addLast newBlock
|
eth1Chain.blocks.addLast newBlock
|
||||||
eth1Chain.blocksByHash[newBlock.voteData.block_hash.asBlockHash] = newBlock
|
eth1Chain.blocksByHash[newBlock.voteData.block_hash.asBlockHash] = newBlock
|
||||||
|
|
||||||
|
@ -372,7 +380,7 @@ proc processDeposits(m: MainchainMonitor,
|
||||||
let (blockHash, eventType) = await m.depositQueue.popFirst()
|
let (blockHash, eventType) = await m.depositQueue.popFirst()
|
||||||
|
|
||||||
if eventType == RemovedEvent:
|
if eventType == RemovedEvent:
|
||||||
info "New Eth1 head selected. Purging history of deposits",
|
debug "New Eth1 head selected. Purging history of deposits",
|
||||||
purgedBlock = $blockHash
|
purgedBlock = $blockHash
|
||||||
m.eth1Chain.purgeChain(blockHash)
|
m.eth1Chain.purgeChain(blockHash)
|
||||||
continue
|
continue
|
||||||
|
@ -384,11 +392,20 @@ proc processDeposits(m: MainchainMonitor,
|
||||||
doAssert Eth1BlockNumber(web3Block.number) > startBlkNum
|
doAssert Eth1BlockNumber(web3Block.number) > startBlkNum
|
||||||
let eth1Block = await dataProvider.fetchDepositData(web3Block)
|
let eth1Block = await dataProvider.fetchDepositData(web3Block)
|
||||||
|
|
||||||
var cachedParent = m.eth1Chain.findParent(web3Block)
|
if m.eth1Chain.addSuccessorBlock(eth1Block):
|
||||||
if cachedParent == nil:
|
# TODO: We may check that the new deposits produce a merkle
|
||||||
|
# root matching the `deposit_root` value from the block.
|
||||||
|
# Not doing this is equivalent to trusting the Eth1
|
||||||
|
# execution engine and data provider.
|
||||||
|
info "Eth1 block processed", eth1data = eth1Block.voteData
|
||||||
|
m.checkForGenesisEvent()
|
||||||
|
else:
|
||||||
# We are missing the parent block.
|
# We are missing the parent block.
|
||||||
# This shouldn't be happening if the deposits events are reported in
|
# This shouldn't be happening if the deposits events are reported in
|
||||||
# proper order, but nevertheless let's try to repair our chain:
|
# proper order, but nevertheless let's try to repair our chain:
|
||||||
|
var cachedParent = m.eth1Chain.findParent(web3Block)
|
||||||
|
doAssert cachedParent == nil
|
||||||
|
|
||||||
var chainOfParents = newSeq[Eth1Block]()
|
var chainOfParents = newSeq[Eth1Block]()
|
||||||
var parentHash = web3Block.parentHash
|
var parentHash = web3Block.parentHash
|
||||||
|
|
||||||
|
@ -417,7 +434,8 @@ proc processDeposits(m: MainchainMonitor,
|
||||||
# No more deposit events are expected
|
# No more deposit events are expected
|
||||||
m.eth1Chain.clear()
|
m.eth1Chain.clear()
|
||||||
for i in countdown(chainOfParents.len - 1, 0):
|
for i in countdown(chainOfParents.len - 1, 0):
|
||||||
m.eth1Chain.addBlock chainOfParents[i]
|
let isSuccessor = m.eth1Chain.addSuccessorBlock chainOfParents[i]
|
||||||
|
doAssert isSuccessor
|
||||||
cachedParent = m.eth1Chain.blocks.peekLast
|
cachedParent = m.eth1Chain.blocks.peekLast
|
||||||
break
|
break
|
||||||
|
|
||||||
|
@ -426,7 +444,8 @@ proc processDeposits(m: MainchainMonitor,
|
||||||
if localParent != nil:
|
if localParent != nil:
|
||||||
m.eth1Chain.purgeDescendants(localParent)
|
m.eth1Chain.purgeDescendants(localParent)
|
||||||
for i in countdown(chainOfParents.len - 1, 0):
|
for i in countdown(chainOfParents.len - 1, 0):
|
||||||
m.eth1Chain.addBlock chainOfParents[i]
|
let isSuccessor = m.eth1Chain.addSuccessorBlock chainOfParents[i]
|
||||||
|
doAssert isSuccessor
|
||||||
cachedParent = m.eth1Chain.blocks.peekLast
|
cachedParent = m.eth1Chain.blocks.peekLast
|
||||||
break
|
break
|
||||||
|
|
||||||
|
@ -435,14 +454,6 @@ proc processDeposits(m: MainchainMonitor,
|
||||||
|
|
||||||
m.eth1Chain.purgeDescendants(cachedParent)
|
m.eth1Chain.purgeDescendants(cachedParent)
|
||||||
|
|
||||||
# TODO: We may check that the new deposits produce a merkle
|
|
||||||
# root matching the `deposit_root` value from the block.
|
|
||||||
# Not doing this is equivalent to trusting the Eth1
|
|
||||||
# execution engine and data provider.
|
|
||||||
info "Eth1 block processed", eth1data = eth1Block.voteData
|
|
||||||
m.eth1Chain.addBlock eth1Block
|
|
||||||
m.checkForGenesisEvent()
|
|
||||||
|
|
||||||
except CatchableError as err:
|
except CatchableError as err:
|
||||||
# Connection problem? Put the unprocessed deposit back to queue.
|
# Connection problem? Put the unprocessed deposit back to queue.
|
||||||
# Raising the exception here will lead to a restart of the whole monitor.
|
# Raising the exception here will lead to a restart of the whole monitor.
|
||||||
|
@ -550,6 +561,9 @@ func web3Provider*(web3Url: string): DataProviderFactory =
|
||||||
|
|
||||||
DataProviderFactory(desc: "web3(" & web3Url & ")", new: factory)
|
DataProviderFactory(desc: "web3(" & web3Url & ")", new: factory)
|
||||||
|
|
||||||
|
func `===`(json: JsonNode, boolean: bool): bool =
|
||||||
|
json.kind == JBool and json.bval == boolean
|
||||||
|
|
||||||
proc run(m: MainchainMonitor, delayBeforeStart: Duration) {.async.} =
|
proc run(m: MainchainMonitor, delayBeforeStart: Duration) {.async.} =
|
||||||
if delayBeforeStart != ZeroDuration:
|
if delayBeforeStart != ZeroDuration:
|
||||||
await sleepAsync(delayBeforeStart)
|
await sleepAsync(delayBeforeStart)
|
||||||
|
@ -580,7 +594,7 @@ proc run(m: MainchainMonitor, delayBeforeStart: Duration) {.async.} =
|
||||||
try:
|
try:
|
||||||
let
|
let
|
||||||
blockHash = BlockHash.fromHex(j["blockHash"].getStr())
|
blockHash = BlockHash.fromHex(j["blockHash"].getStr())
|
||||||
eventType = if j.hasKey("removed"): RemovedEvent
|
eventType = if j{"removed"} === true: RemovedEvent
|
||||||
else: NewEvent
|
else: NewEvent
|
||||||
|
|
||||||
m.depositQueue.addLastNoWait((blockHash, eventType))
|
m.depositQueue.addLastNoWait((blockHash, eventType))
|
||||||
|
|
|
@ -1 +1 @@
|
||||||
Subproject commit 5498b62dbd99afe0add0d763d94bbfad640e766b
|
Subproject commit e0e51015b7348b61aa5d7391af4f8f4487713f91
|
|
@ -1 +1 @@
|
||||||
Subproject commit cf82e2d51d12c3ca461cb170c2a4e2eada3bfe67
|
Subproject commit 152eb1b58cd618a175dc2ae6fba39e620115c356
|
|
@ -1 +1 @@
|
||||||
Subproject commit 4889e41a1ca74dba5a85622e5dbf50547bec08f2
|
Subproject commit 694ff2ad74b36f7f8402235f92134cb5532661d4
|
Loading…
Reference in New Issue