Retry all web3 requests up to 3 times with exponential backoff

This commit is contained in:
Zahary Karadjov 2020-12-01 23:20:28 +02:00 committed by zah
parent f3b8c777a7
commit 7225a6d6ed
2 changed files with 67 additions and 27 deletions

View File

@ -22,7 +22,7 @@ NODE_ID := 0
BASE_PORT := 9000 BASE_PORT := 9000
BASE_RPC_PORT := 9190 BASE_RPC_PORT := 9190
BASE_METRICS_PORT := 8008 BASE_METRICS_PORT := 8008
GOERLI_WEB3_URL := "wss://goerli.infura.io/ws/v3/809a18497dd74102b5f37d25aae3c85a" WEB3_URL := "wss://goerli.infura.io/ws/v3/809a18497dd74102b5f37d25aae3c85a"
VALIDATORS := 1 VALIDATORS := 1
CPU_LIMIT := 0 CPU_LIMIT := 0
BUILD_END_MSG := "\\e[92mBuild completed successfully:\\e[39m" BUILD_END_MSG := "\\e[92mBuild completed successfully:\\e[39m"
@ -165,7 +165,7 @@ clean_eth2_network_simulation_all:
rm -rf tests/simulation/{data,validators} rm -rf tests/simulation/{data,validators}
GOERLI_TESTNETS_PARAMS := \ GOERLI_TESTNETS_PARAMS := \
--web3-url=$(GOERLI_WEB3_URL) \ --web3-url=$(WEB3_URL) \
--tcp-port=$$(( $(BASE_PORT) + $(NODE_ID) )) \ --tcp-port=$$(( $(BASE_PORT) + $(NODE_ID) )) \
--udp-port=$$(( $(BASE_PORT) + $(NODE_ID) )) \ --udp-port=$$(( $(BASE_PORT) + $(NODE_ID) )) \
--metrics \ --metrics \
@ -283,7 +283,7 @@ define MAKE_DEPOSIT
--count=$(VALIDATORS) --count=$(VALIDATORS)
build/deposit_contract sendDeposits \ build/deposit_contract sendDeposits \
--web3-url=$(GOERLI_WEB3_URL) \ --web3-url=$(WEB3_URL) \
--deposit-contract=$$(cat vendor/eth2-testnets/shared/$(1)/deposit_contract.txt) \ --deposit-contract=$$(cat vendor/eth2-testnets/shared/$(1)/deposit_contract.txt) \
--deposits-file=nbc-$(1)-deposits.json \ --deposits-file=nbc-$(1)-deposits.json \
--min-delay=$(DEPOSITS_DELAY) \ --min-delay=$(DEPOSITS_DELAY) \

View File

@ -83,8 +83,9 @@ type
Web3DataProviderRef* = ref Web3DataProvider Web3DataProviderRef* = ref Web3DataProvider
CorruptDataProvider = object of CatchableError DataProviderFailure = object of CatchableError
DataProviderTimeout = object of CatchableError CorruptDataProvider = object of DataProviderFailure
DataProviderTimeout = object of DataProviderFailure
DisconnectHandler* = proc () {.gcsafe, raises: [Defect].} DisconnectHandler* = proc () {.gcsafe, raises: [Defect].}
@ -213,9 +214,43 @@ func hash*(x: Eth1Data): Hash =
template hash*(x: Eth1Block): Hash = template hash*(x: Eth1Block): Hash =
hash(x.voteData) hash(x.voteData)
template awaitWithRetries[T](lazyFutExpr: Future[T],
retries = 3,
timeout = web3Timeouts): untyped =
const
reqType = astToStr(lazyFutExpr)
var
retryDelayMs = 16000
f: Future[T]
attempts = 0
while true:
f = lazyFutExpr
yield f or sleepAsync(timeout)
if not f.finished:
await cancelAndWait(f)
elif f.failed:
if f.error[] of Defect:
raise f.error
else:
debug "Web3 request failed", req = reqType, err = f.error.msg
else:
break
inc attempts
if attempts >= retries:
raise newException(DataProviderFailure,
reqType & " failed " & $retries & " times")
await sleepAsync(chronos.milliseconds(retryDelayMs))
retryDelayMs *= 2
read(f)
proc close*(p: Web3DataProviderRef): Future[void] {.async.} = proc close*(p: Web3DataProviderRef): Future[void] {.async.} =
if p.blockHeadersSubscription != nil: if p.blockHeadersSubscription != nil:
await p.blockHeadersSubscription.unsubscribe() awaitWithRetries(p.blockHeadersSubscription.unsubscribe())
await p.web3.close() await p.web3.close()
@ -273,7 +308,8 @@ proc depositEventsToBlocks(depositsList: JsonNode): seq[Eth1Block] =
signature: ValidatorSig.init(array[96, byte](signature))) signature: ValidatorSig.init(array[96, byte](signature)))
proc fetchTimestamp(p: Web3DataProviderRef, blk: Eth1Block) {.async.} = proc fetchTimestamp(p: Web3DataProviderRef, blk: Eth1Block) {.async.} =
let web3block = await p.getBlockByHash(blk.voteData.block_hash.asBlockHash) let web3block = awaitWithRetries(
p.getBlockByHash(blk.voteData.block_hash.asBlockHash))
blk.timestamp = Eth1BlockTimestamp web3block.timestamp blk.timestamp = Eth1BlockTimestamp web3block.timestamp
type type
@ -333,12 +369,12 @@ proc onBlockHeaders*(p: Web3DataProviderRef,
blockHeaderHandler: BlockHeaderHandler, blockHeaderHandler: BlockHeaderHandler,
errorHandler: SubscriptionErrorHandler) {.async.} = errorHandler: SubscriptionErrorHandler) {.async.} =
if p.blockHeadersSubscription != nil: if p.blockHeadersSubscription != nil:
await p.blockHeadersSubscription.unsubscribe() awaitWithRetries(p.blockHeadersSubscription.unsubscribe())
info "Waiting for new Eth1 block headers" info "Waiting for new Eth1 block headers"
p.blockHeadersSubscription = await p.web3.subscribeForBlockHeaders( p.blockHeadersSubscription = awaitWithRetries(
blockHeaderHandler, errorHandler) p.web3.subscribeForBlockHeaders(blockHeaderHandler, errorHandler))
{.push raises: [Defect].} {.push raises: [Defect].}
@ -589,7 +625,7 @@ proc init*(T: type Eth1Monitor,
if eth1Network.isSome: if eth1Network.isSome:
let let
providerNetwork = await web3.provider.net_version() providerNetwork = awaitWithRetries web3.provider.net_version()
expectedNetwork = case eth1Network.get expectedNetwork = case eth1Network.get
of mainnet: "1" of mainnet: "1"
of rinkeby: "4" of rinkeby: "4"
@ -689,7 +725,9 @@ proc syncBlockRange(m: Eth1Monitor,
if blk.number > fullSyncFromBlock: if blk.number > fullSyncFromBlock:
let lastBlock = m.eth1Chain.blocks.peekLast let lastBlock = m.eth1Chain.blocks.peekLast
for n in max(lastBlock.number + 1, fullSyncFromBlock) ..< blk.number: for n in max(lastBlock.number + 1, fullSyncFromBlock) ..< blk.number:
let blockWithoutDeposits = await m.dataProvider.getBlockByNumber(n) let blockWithoutDeposits = awaitWithRetries(
m.dataProvider.getBlockByNumber(n))
m.eth1Chain.addBlock( m.eth1Chain.addBlock(
lastBlock.makeSuccessorWithoutDeposits(blockWithoutDeposits)) lastBlock.makeSuccessorWithoutDeposits(blockWithoutDeposits))
@ -700,7 +738,7 @@ proc syncBlockRange(m: Eth1Monitor,
template lastBlock: auto = blocksWithDeposits[lastIdx] template lastBlock: auto = blocksWithDeposits[lastIdx]
let status = when hasDepositRootChecks: let status = when hasDepositRootChecks:
await m.dataProvider.fetchDepositContractData(lastBlock) awaitWithRetries m.dataProvider.fetchDepositContractData(lastBlock)
else: else:
DepositRootUnavailable DepositRootUnavailable
@ -748,7 +786,8 @@ proc syncBlockRange(m: Eth1Monitor,
if maxBlockNumberRequested == toBlock and if maxBlockNumberRequested == toBlock and
(m.eth1Chain.blocks.len == 0 or lastBlock.number != toBlock): (m.eth1Chain.blocks.len == 0 or lastBlock.number != toBlock):
let web3Block = await m.dataProvider.getBlockByNumber(toBlock) let web3Block = awaitWithRetries(
m.dataProvider.getBlockByNumber(toBlock))
debug "Latest block doesn't hold deposits. Obtaining it", debug "Latest block doesn't hold deposits. Obtaining it",
ts = web3Block.timestamp.uint64, ts = web3Block.timestamp.uint64,
@ -756,14 +795,14 @@ proc syncBlockRange(m: Eth1Monitor,
m.eth1Chain.addBlock lastBlock.makeSuccessorWithoutDeposits(web3Block) m.eth1Chain.addBlock lastBlock.makeSuccessorWithoutDeposits(web3Block)
else: else:
await m.dataProvider.fetchTimestamp(lastBlock) awaitWithRetries m.dataProvider.fetchTimestamp(lastBlock)
var genesisBlockIdx = m.eth1Chain.blocks.len - 1 var genesisBlockIdx = m.eth1Chain.blocks.len - 1
if m.isAfterMinGenesisTime(m.eth1Chain.blocks[genesisBlockIdx]): if m.isAfterMinGenesisTime(m.eth1Chain.blocks[genesisBlockIdx]):
for i in 1 ..< eth1Blocks.len: for i in 1 ..< eth1Blocks.len:
let idx = (m.eth1Chain.blocks.len - 1) - i let idx = (m.eth1Chain.blocks.len - 1) - i
let blk = m.eth1Chain.blocks[idx] let blk = m.eth1Chain.blocks[idx]
await m.dataProvider.fetchTimestamp(blk) awaitWithRetries m.dataProvider.fetchTimestamp(blk)
if m.isGenesisCandidate(blk): if m.isGenesisCandidate(blk):
genesisBlockIdx = idx genesisBlockIdx = idx
else: else:
@ -785,11 +824,12 @@ proc syncBlockRange(m: Eth1Monitor,
if genesisBlockIdx > 0: if genesisBlockIdx > 0:
let genesisParent = m.eth1Chain.blocks[genesisBlockIdx - 1] let genesisParent = m.eth1Chain.blocks[genesisBlockIdx - 1]
if genesisParent.timestamp == 0: if genesisParent.timestamp == 0:
await m.dataProvider.fetchTimestamp(genesisParent) awaitWithRetries m.dataProvider.fetchTimestamp(genesisParent)
if m.hasEnoughValidators(genesisParent) and if m.hasEnoughValidators(genesisParent) and
genesisBlock.number - genesisParent.number > 1: genesisBlock.number - genesisParent.number > 1:
genesisBlock = await m.findGenesisBlockInRange(genesisParent, genesisBlock = awaitWithRetries(
genesisBlock) m.findGenesisBlockInRange(genesisParent, genesisBlock))
m.signalGenesis m.createGenesisState(genesisBlock) m.signalGenesis m.createGenesisState(genesisBlock)
proc startEth1Syncing(m: Eth1Monitor) {.async.} = proc startEth1Syncing(m: Eth1Monitor) {.async.} =
@ -799,11 +839,8 @@ proc startEth1Syncing(m: Eth1Monitor) {.async.} =
m.eth2FinalizedDepositsMerkleizer = m.knownStart.createMerkleizer m.eth2FinalizedDepositsMerkleizer = m.knownStart.createMerkleizer
let startBlock = awaitWithTimeout( let startBlock = awaitWithRetries(
m.dataProvider.getBlockByHash(m.knownStart.eth1Block.asBlockHash), m.dataProvider.getBlockByHash(m.knownStart.eth1Block.asBlockHash))
web3Timeouts):
error "Eth1 sync failed to obtain information about the starting block in time"
return
doAssert m.eth1Chain.blocks.len == 0 doAssert m.eth1Chain.blocks.len == 0
m.eth1Chain.addBlock Eth1Block( m.eth1Chain.addBlock Eth1Block(
@ -888,7 +925,8 @@ proc start*(m: Eth1Monitor) =
proc getEth1BlockHash*(url: string, blockId: RtBlockIdentifier): Future[BlockHash] {.async.} = proc getEth1BlockHash*(url: string, blockId: RtBlockIdentifier): Future[BlockHash] {.async.} =
let web3 = await newWeb3(url) let web3 = await newWeb3(url)
try: try:
let blk = await web3.provider.eth_getBlockByNumber(blockId, false) let blk = awaitWithRetries(
web3.provider.eth_getBlockByNumber(blockId, false))
return blk.hash return blk.hash
finally: finally:
await web3.close() await web3.close()
@ -914,7 +952,8 @@ when hasGenesisDetection:
var blk: BlockObject var blk: BlockObject
while true: while true:
try: try:
blk = await dataProvider.getBlockByNumber(depositContractDeployedAt.number) blk = awaitWithRetries(
dataProvider.getBlockByNumber(depositContractDeployedAt.number))
break break
except CatchableError as err: except CatchableError as err:
error "Failed to obtain details for the starting block " & error "Failed to obtain details for the starting block " &
@ -1012,7 +1051,8 @@ when hasGenesisDetection:
float(endBlock.number - startBlock.number) float(endBlock.number - startBlock.number)
blocksToJump = max(float(MIN_GENESIS_TIME - startBlockTime) / secondsPerBlock, 1.0) blocksToJump = max(float(MIN_GENESIS_TIME - startBlockTime) / secondsPerBlock, 1.0)
candidateNumber = min(endBlock.number - 1, startBlock.number + blocksToJump.uint64) candidateNumber = min(endBlock.number - 1, startBlock.number + blocksToJump.uint64)
candidateBlock = await m.dataProvider.getBlockByNumber(candidateNumber) candidateBlock = awaitWithRetries(
m.dataProvider.getBlockByNumber(candidateNumber))
var candidateAsEth1Block = Eth1Block(number: candidateBlock.number.uint64, var candidateAsEth1Block = Eth1Block(number: candidateBlock.number.uint64,
timestamp: candidateBlock.timestamp.uint64, timestamp: candidateBlock.timestamp.uint64,