Avoid potential stalls in the Eth1 monitor code due to missing timeouts (#3905)

* Avoid potential stalls in the Eth1 monitor code due to missing timeouts

* 1 second timeout for exchange-configuration
This commit is contained in:
zah 2022-07-25 22:23:02 +03:00 committed by GitHub
parent 1a1553fee8
commit 7d27e10315
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 69 additions and 52 deletions

View File

@ -425,7 +425,8 @@ proc close(p: Web3DataProviderRef): Future[void] {.async.} =
except CatchableError: except CatchableError:
debug "Failed to clean up block headers subscription properly" debug "Failed to clean up block headers subscription properly"
await p.web3.close() awaitWithTimeout(p.web3.close(), 30.seconds):
debug "Failed to close data provider in time"
proc getBlockByHash(p: Web3DataProviderRef, hash: BlockHash): proc getBlockByHash(p: Web3DataProviderRef, hash: BlockHash):
Future[BlockObject] = Future[BlockObject] =
@ -538,8 +539,15 @@ proc exchangeTransitionConfiguration*(p: Eth1Monitor): Future[void] {.async.} =
# https://github.com/nim-lang/Nim/issues/19802 # https://github.com/nim-lang/Nim/issues/19802
(static(default(Quantity)))) (static(default(Quantity))))
let ecTransitionConfiguration = let ecTransitionConfiguration =
await p.dataProvider.web3.provider.engine_exchangeTransitionConfigurationV1( try:
ccTransitionConfiguration) awaitWithRetries(
p.dataProvider.web3.provider.engine_exchangeTransitionConfigurationV1(
ccTransitionConfiguration),
timeout = 1.seconds)
except CatchableError as err:
debug "Failed to exchange transition configuration", err = err.msg
return
if ccTransitionConfiguration != ecTransitionConfiguration: if ccTransitionConfiguration != ecTransitionConfiguration:
warn "exchangeTransitionConfiguration: Configuration mismatch detected", warn "exchangeTransitionConfiguration: Configuration mismatch detected",
consensusTerminalTotalDifficulty = consensusTerminalTotalDifficulty =
@ -625,14 +633,14 @@ type
DepositCountIncorrect DepositCountIncorrect
DepositCountUnavailable DepositCountUnavailable
template awaitOrRaiseOnTimeout[T](fut: Future[T],
timeout: Duration): T =
awaitWithTimeout(fut, timeout):
raise newException(DataProviderTimeout, "Timeout")
when hasDepositRootChecks: when hasDepositRootChecks:
const const
contractCallTimeout = seconds(60) contractCallTimeout = 60.seconds
template awaitOrRaiseOnTimeout[T](fut: Future[T],
timeout: Duration): T =
awaitWithTimeout(fut, timeout):
raise newException(DataProviderTimeout, "Timeout")
func fetchDepositContractData(p: Web3DataProviderRef, blk: Eth1Block): func fetchDepositContractData(p: Web3DataProviderRef, blk: Eth1Block):
Future[DepositContractDataStatus] {.async.} = Future[DepositContractDataStatus] {.async.} =
@ -915,7 +923,7 @@ proc new*(T: type Web3DataProvider,
jwtSecret: Option[seq[byte]]): jwtSecret: Option[seq[byte]]):
Future[Result[Web3DataProviderRef, string]] {.async.} = Future[Result[Web3DataProviderRef, string]] {.async.} =
let web3Fut = newWeb3(web3Url, getJsonRpcRequestHeaders(jwtSecret)) let web3Fut = newWeb3(web3Url, getJsonRpcRequestHeaders(jwtSecret))
yield web3Fut or sleepAsync(chronos.seconds(10)) yield web3Fut or sleepAsync(10.seconds)
if (not web3Fut.finished) or web3Fut.failed: if (not web3Fut.finished) or web3Fut.failed:
await cancelAndWait(web3Fut) await cancelAndWait(web3Fut)
if web3Fut.failed: if web3Fut.failed:
@ -1023,7 +1031,7 @@ func clear(chain: var Eth1Chain) =
chain.hasConsensusViolation = false chain.hasConsensusViolation = false
proc detectPrimaryProviderComingOnline(m: Eth1Monitor) {.async.} = proc detectPrimaryProviderComingOnline(m: Eth1Monitor) {.async.} =
const checkInterval = chronos.seconds(30) const checkInterval = 30.seconds
let let
web3Url = m.web3Urls[0] web3Url = m.web3Urls[0]
@ -1063,7 +1071,8 @@ proc doStop(m: Eth1Monitor) {.async.} =
safeCancel m.runFut safeCancel m.runFut
if m.dataProvider != nil: if m.dataProvider != nil:
await m.dataProvider.close() awaitWithTimeout(m.dataProvider.close(), 30.seconds):
debug "Failed to close data provider in time"
m.dataProvider = nil m.dataProvider = nil
proc ensureDataProvider*(m: Eth1Monitor) {.async.} = proc ensureDataProvider*(m: Eth1Monitor) {.async.} =
@ -1499,7 +1508,8 @@ proc start*(m: Eth1Monitor) =
proc getEth1BlockHash*( proc getEth1BlockHash*(
url: string, blockId: RtBlockIdentifier, jwtSecret: Option[seq[byte]]): url: string, blockId: RtBlockIdentifier, jwtSecret: Option[seq[byte]]):
Future[BlockHash] {.async.} = Future[BlockHash] {.async.} =
let web3 = await newWeb3(url, getJsonRpcRequestHeaders(jwtSecret)) let web3 = awaitOrRaiseOnTimeout(newWeb3(url, getJsonRpcRequestHeaders(jwtSecret)),
10.seconds)
try: try:
let blk = awaitWithRetries( let blk = awaitWithRetries(
web3.provider.eth_getBlockByNumber(blockId, false)) web3.provider.eth_getBlockByNumber(blockId, false))
@ -1507,54 +1517,61 @@ proc getEth1BlockHash*(
finally: finally:
await web3.close() await web3.close()
func `$`(x: Quantity): string =
$(x.uint64)
func `$`(x: BlockObject): string =
$(x.number) & " [" & $(x.hash) & "]"
proc testWeb3Provider*(web3Url: Uri, proc testWeb3Provider*(web3Url: Uri,
depositContractAddress: Eth1Address, depositContractAddress: Eth1Address,
jwtSecret: Option[seq[byte]]) {.async.} = jwtSecret: Option[seq[byte]]) {.async.} =
template mustSucceed(action: static string, expr: untyped): untyped = stdout.write "Establishing web3 connection..."
try: expr var web3: Web3
try:
web3 = awaitOrRaiseOnTimeout(
newWeb3($web3Url, getJsonRpcRequestHeaders(jwtSecret)), 5.seconds)
stdout.write "\rEstablishing web3 connection: Connected\n"
except CatchableError as err:
stdout.write "\rEstablishing web3 connection: Failure(" & err.msg & ")\n"
quit 1
template request(actionDesc: static string,
action: untyped): untyped =
stdout.write actionDesc & "..."
stdout.flushFile()
var res: typeof(read action)
try:
res = awaitWithRetries action
stdout.write "\r" & actionDesc & ": " & $res
except CatchableError as err: except CatchableError as err:
fatal("Failed to " & action, err = err.msg) stdout.write "\r" & actionDesc & ": Error(" & err.msg & ")"
quit 1 stdout.write "\n"
res
let let
web3 = mustSucceed "connect to web3 provider": clientVersion = request "Client version":
await newWeb3( web3.provider.web3_clientVersion()
$web3Url, getJsonRpcRequestHeaders(jwtSecret))
latestBlock = mustSucceed "get latest block":
awaitWithRetries web3.provider.eth_getBlockByNumber(blockId("latest"), false)
syncStatus = mustSucceed "get sync status":
awaitWithRetries web3.provider.eth_syncing()
peers =
try:
awaitWithRetries web3.provider.net_peerCount()
except:
0
clientVersion = mustSucceed "get client version":
awaitWithRetries web3.provider.web3_clientVersion()
mining = mustSucceed "get mining status":
awaitWithRetries web3.provider.eth_mining()
echo "Client Version: ", clientVersion chainId = request "Chain ID":
echo "Network Peers: ", peers web3.provider.eth_chainId()
echo "Syncing: ", syncStatus
echo "Latest block: ", latestBlock.number.uint64
echo "Last Known Nonce: ", web3.lastKnownNonce
echo "Mining: ", mining
try: latestBlock = request "Latest block":
let chainId = awaitWithRetries web3.provider.eth_chainId() web3.provider.eth_getBlockByNumber(blockId("latest"), false)
echo "Chain ID: ", chainId.uint64
except DataProviderFailure as exc:
# Typically because it's not synced through EIP-155.
echo "Web3 provider does not provide chain ID: " & exc.msg
let ns = web3.contractSender(DepositContract, depositContractAddress) syncStatus = request "Sync status":
try: web3.provider.eth_syncing()
let depositRoot = awaitWithRetries(
ns.get_deposit_root.call(blockNumber = latestBlock.number.uint64)) peers = request "Peers":
echo "Deposit root: ", depositRoot web3.provider.net_peerCount()
except CatchableError as err:
echo "Web3 provider is not archive mode: ", err.msg miningStatus = request "Mining status":
web3.provider.eth_mining()
ns = web3.contractSender(DepositContract, depositContractAddress)
depositRoot = request "Deposit root":
ns.get_deposit_root.call(blockNumber = latestBlock.number.uint64)
when hasGenesisDetection: when hasGenesisDetection:
proc loadPersistedDeposits*(monitor: Eth1Monitor) = proc loadPersistedDeposits*(monitor: Eth1Monitor) =