More user-friendly logging during mainchain monitoring
This commit is contained in:
parent
e65c1f49ce
commit
14274587cf
|
@ -450,17 +450,6 @@ when useNativeSnappy:
|
||||||
else:
|
else:
|
||||||
include libp2p_streams_backend
|
include libp2p_streams_backend
|
||||||
|
|
||||||
template awaitWithTimeout[T](operation: Future[T],
|
|
||||||
deadline: Future[void],
|
|
||||||
onTimeout: untyped): T =
|
|
||||||
let f = operation
|
|
||||||
await f or deadline
|
|
||||||
if not f.finished:
|
|
||||||
cancel f
|
|
||||||
onTimeout
|
|
||||||
else:
|
|
||||||
f.read
|
|
||||||
|
|
||||||
proc makeEth2Request(peer: Peer, protocolId: string, requestBytes: Bytes,
|
proc makeEth2Request(peer: Peer, protocolId: string, requestBytes: Bytes,
|
||||||
ResponseMsg: type,
|
ResponseMsg: type,
|
||||||
timeout: Duration): Future[NetRes[ResponseMsg]]
|
timeout: Duration): Future[NetRes[ResponseMsg]]
|
||||||
|
|
|
@ -1,7 +1,8 @@
|
||||||
import
|
import
|
||||||
deques, tables, hashes, options,
|
deques, tables, hashes, options,
|
||||||
chronos, web3, json, chronicles,
|
chronos, web3, json, chronicles, eth/async_utils,
|
||||||
spec/[datatypes, digest, crypto, beaconstate, helpers]
|
spec/[datatypes, digest, crypto, beaconstate, helpers],
|
||||||
|
merkle_minimal
|
||||||
|
|
||||||
contract(DepositContract):
|
contract(DepositContract):
|
||||||
proc deposit(pubkey: Bytes48,
|
proc deposit(pubkey: Bytes48,
|
||||||
|
@ -86,6 +87,7 @@ type
|
||||||
|
|
||||||
const
|
const
|
||||||
reorgDepthLimit = 1000
|
reorgDepthLimit = 1000
|
||||||
|
web3Timeouts = 5.seconds
|
||||||
|
|
||||||
# TODO Nim's analysis on the lock level of the methods in this
|
# TODO Nim's analysis on the lock level of the methods in this
|
||||||
# module seems broken. Investigate and file this as an issue.
|
# module seems broken. Investigate and file this as an issue.
|
||||||
|
@ -304,13 +306,13 @@ proc init*(T: type MainchainMonitor,
|
||||||
|
|
||||||
const MIN_GENESIS_TIME = 0
|
const MIN_GENESIS_TIME = 0
|
||||||
|
|
||||||
proc readJsonDeposits(json: JsonNode): seq[Deposit] =
|
proc readJsonDeposits(depositsList: JsonNode): seq[Deposit] =
|
||||||
if json.kind != JArray:
|
if depositsList.kind != JArray:
|
||||||
raise newException(CatchableError,
|
raise newException(CatchableError,
|
||||||
"Web3 provider didn't return a list of deposit events")
|
"Web3 provider didn't return a list of deposit events")
|
||||||
|
|
||||||
for logEvent in json:
|
for logEvent in depositsList:
|
||||||
var logData = strip0xPrefix(json["data"].getStr)
|
var logData = strip0xPrefix(logEvent["data"].getStr)
|
||||||
var
|
var
|
||||||
pubkey: Bytes48
|
pubkey: Bytes48
|
||||||
withdrawalCredentials: Bytes32
|
withdrawalCredentials: Bytes32
|
||||||
|
@ -326,7 +328,6 @@ proc readJsonDeposits(json: JsonNode): seq[Deposit] =
|
||||||
offset = decode(logData, offset, index)
|
offset = decode(logData, offset, index)
|
||||||
|
|
||||||
result.add Deposit(
|
result.add Deposit(
|
||||||
# proof: TODO
|
|
||||||
data: DepositData(
|
data: DepositData(
|
||||||
pubkey: ValidatorPubKey.init(array[48, byte](pubkey)),
|
pubkey: ValidatorPubKey.init(array[48, byte](pubkey)),
|
||||||
withdrawal_credentials: Eth2Digest(data: array[32, byte](withdrawalCredentials)),
|
withdrawal_credentials: Eth2Digest(data: array[32, byte](withdrawalCredentials)),
|
||||||
|
@ -345,13 +346,20 @@ proc checkForGenesisEvent(m: MainchainMonitor) =
|
||||||
m.eth1Chain.totalDeposits >= totalDepositsNeeded:
|
m.eth1Chain.totalDeposits >= totalDepositsNeeded:
|
||||||
# This block is a genesis candidate
|
# This block is a genesis candidate
|
||||||
let startTime = lastBlock.timestamp.uint64
|
let startTime = lastBlock.timestamp.uint64
|
||||||
|
var genesisDeposits = m.eth1Chain.allDeposits
|
||||||
|
attachMerkleProofs genesisDeposits
|
||||||
var s = initialize_beacon_state_from_eth1(lastBlock.voteData.block_hash,
|
var s = initialize_beacon_state_from_eth1(lastBlock.voteData.block_hash,
|
||||||
startTime, m.eth1Chain.allDeposits, {})
|
startTime, genesisDeposits, {})
|
||||||
if is_valid_genesis_state(s[]):
|
if is_valid_genesis_state(s[]):
|
||||||
# https://github.com/ethereum/eth2.0-pm/tree/6e41fcf383ebeb5125938850d8e9b4e9888389b4/interop/mocked_start#create-genesis-state
|
# https://github.com/ethereum/eth2.0-pm/tree/6e41fcf383ebeb5125938850d8e9b4e9888389b4/interop/mocked_start#create-genesis-state
|
||||||
s.genesis_time = startTime
|
s.genesis_time = startTime
|
||||||
|
|
||||||
|
info "Eth2 genesis state detected",
|
||||||
|
genesisTime = startTime,
|
||||||
|
genesisEth1Block = lastBlock.voteData.block_hash
|
||||||
|
|
||||||
m.genesisState = s
|
m.genesisState = s
|
||||||
|
|
||||||
if not m.genesisStateFut.isNil:
|
if not m.genesisStateFut.isNil:
|
||||||
m.genesisStateFut.complete()
|
m.genesisStateFut.complete()
|
||||||
m.genesisStateFut = nil
|
m.genesisStateFut = nil
|
||||||
|
@ -370,6 +378,8 @@ proc processDeposits(m: MainchainMonitor, dataProvider: DataProviderRef) {.
|
||||||
let (blockHash, eventType) = await m.depositQueue.popFirst()
|
let (blockHash, eventType) = await m.depositQueue.popFirst()
|
||||||
|
|
||||||
if eventType == RemovedEvent:
|
if eventType == RemovedEvent:
|
||||||
|
info "New Eth1 head selected. Purging history of deposits",
|
||||||
|
purgedBlock = $blockHash
|
||||||
m.eth1Chain.purgeChain(blockHash)
|
m.eth1Chain.purgeChain(blockHash)
|
||||||
continue
|
continue
|
||||||
|
|
||||||
|
@ -421,14 +431,19 @@ proc processDeposits(m: MainchainMonitor, dataProvider: DataProviderRef) {.
|
||||||
|
|
||||||
m.eth1Chain.purgeDescendants(cachedParent)
|
m.eth1Chain.purgeDescendants(cachedParent)
|
||||||
|
|
||||||
|
# TODO: We may check that the new deposits produce a merkle
|
||||||
|
# root matching the `deposit_root` value from the block.
|
||||||
|
# Not doing this is equivalent to trusting the Eth1
|
||||||
|
# execution engine and data provider.
|
||||||
|
info "Eth1 block processed", eth1data = eth1Block.voteData
|
||||||
m.eth1Chain.addBlock eth1Block
|
m.eth1Chain.addBlock eth1Block
|
||||||
m.checkForGenesisEvent()
|
m.checkForGenesisEvent()
|
||||||
|
|
||||||
except CatchableError:
|
except CatchableError as err:
|
||||||
# Connection problem? Put the unprocessed deposit back to queue.
|
# Connection problem? Put the unprocessed deposit back to queue.
|
||||||
# Raising the exception here will lead to a restart of the whole monitor.
|
# Raising the exception here will lead to a restart of the whole monitor.
|
||||||
m.depositQueue.addFirstNoWait((blockHash, eventType))
|
m.depositQueue.addFirstNoWait((blockHash, eventType))
|
||||||
raise
|
raise err
|
||||||
|
|
||||||
proc isRunning*(m: MainchainMonitor): bool =
|
proc isRunning*(m: MainchainMonitor): bool =
|
||||||
not m.runFut.isNil
|
not m.runFut.isNil
|
||||||
|
@ -447,8 +462,7 @@ proc getGenesis*(m: MainchainMonitor): Future[BeaconStateRef] {.async.} =
|
||||||
raiseAssert "Unreachable code"
|
raiseAssert "Unreachable code"
|
||||||
|
|
||||||
method getBlockByHash*(p: Web3DataProviderRef, hash: BlockHash): Future[BlockObject] =
|
method getBlockByHash*(p: Web3DataProviderRef, hash: BlockHash): Future[BlockObject] =
|
||||||
discard
|
p.web3.provider.eth_getBlockByHash(hash, false)
|
||||||
# p.web3.provider.eth_getBlockByHash(hash, false)
|
|
||||||
|
|
||||||
method close*(p: Web3DataProviderRef): Future[void] {.async, locks: 0.} =
|
method close*(p: Web3DataProviderRef): Future[void] {.async, locks: 0.} =
|
||||||
if p.subscription != nil:
|
if p.subscription != nil:
|
||||||
|
@ -498,7 +512,8 @@ proc getBlockNumber(p: DataProviderRef, hash: BlockHash): Future[Quantity] {.asy
|
||||||
debug "Querying block number", hash = $hash
|
debug "Querying block number", hash = $hash
|
||||||
|
|
||||||
try:
|
try:
|
||||||
let blk = await p.getBlockByHash(hash)
|
let blk = awaitWithTimeout(p.getBlockByHash(hash), web3Timeouts):
|
||||||
|
return Quantity(0'u64)
|
||||||
return blk.number
|
return blk.number
|
||||||
except CatchableError as exc:
|
except CatchableError as exc:
|
||||||
notice "Failed to get Eth1 block number from hash",
|
notice "Failed to get Eth1 block number from hash",
|
||||||
|
@ -540,12 +555,16 @@ proc run(m: MainchainMonitor, delayBeforeStart: Duration) {.async.} =
|
||||||
let processFut = m.processDeposits(dataProvider)
|
let processFut = m.processDeposits(dataProvider)
|
||||||
try:
|
try:
|
||||||
dataProvider.onDisconnect do:
|
dataProvider.onDisconnect do:
|
||||||
error "Eth1 data provider disconnected",
|
warn "Eth1 data provider disconnected",
|
||||||
provider = m.dataProviderFactory.desc
|
provider = m.dataProviderFactory.desc
|
||||||
processFut.cancel()
|
processFut.cancel()
|
||||||
|
|
||||||
let startBlkNum = await dataProvider.getBlockNumber(m.startBlock)
|
let startBlkNum = if m.startBlock != default(BlockHash):
|
||||||
notice "Monitoring eth1 deposits",
|
await dataProvider.getBlockNumber(m.startBlock)
|
||||||
|
else:
|
||||||
|
Quantity(0'u64)
|
||||||
|
|
||||||
|
info "Monitoring eth1 deposits",
|
||||||
fromBlock = startBlkNum.uint64,
|
fromBlock = startBlkNum.uint64,
|
||||||
contract = $m.depositContractAddress,
|
contract = $m.depositContractAddress,
|
||||||
url = m.dataProviderFactory.desc
|
url = m.dataProviderFactory.desc
|
||||||
|
|
|
@ -1 +1 @@
|
||||||
Subproject commit c13e59adaf988f9bb754dba05b0efa9176ffe664
|
Subproject commit da56b0531eb7ced50ffa3470e93d30c3a4a78434
|
Loading…
Reference in New Issue