Support for fallback web3 providers; Fix resource leaks on Eth1 monitor restarts

This commit is contained in:
Zahary Karadjov 2021-04-07 00:42:59 +03:00 committed by zah
parent 7451c4171f
commit ba59dd85cd
5 changed files with 35 additions and 25 deletions

View File

@ -102,10 +102,9 @@ type
desc: "A directory containing wallet files" desc: "A directory containing wallet files"
name: "wallets-dir" }: Option[InputDir] name: "wallets-dir" }: Option[InputDir]
web3Url* {. web3Urls* {.
defaultValue: "" desc: "One of more Web3 provider URLs used for obtaining deposit contract data"
desc: "URL of the Web3 server to observe Eth1" name: "web3-url" }: seq[string]
name: "web3-url" }: string
web3Mode* {. web3Mode* {.
hidden hidden

View File

@ -81,7 +81,8 @@ type
Eth1Monitor* = ref object Eth1Monitor* = ref object
state: Eth1MonitorState state: Eth1MonitorState
web3Url: string startIdx: int
web3Urls: seq[string]
eth1Network: Option[Eth1Network] eth1Network: Option[Eth1Network]
depositContractAddress*: Eth1Address depositContractAddress*: Eth1Address
@ -739,7 +740,7 @@ proc new(T: type Web3DataProvider,
depositContractAddress: Eth1Address, depositContractAddress: Eth1Address,
web3Url: string): Future[Result[Web3DataProviderRef, string]] {.async.} = web3Url: string): Future[Result[Web3DataProviderRef, string]] {.async.} =
let web3Fut = newWeb3(web3Url) let web3Fut = newWeb3(web3Url)
yield web3Fut or sleepAsync(chronos.seconds(5)) yield web3Fut or sleepAsync(chronos.seconds(10))
if (not web3Fut.finished) or web3Fut.failed: if (not web3Fut.finished) or web3Fut.failed:
await cancelAndWait(web3Fut) await cancelAndWait(web3Fut)
return err "Failed to setup web3 connection" return err "Failed to setup web3 connection"
@ -772,19 +773,22 @@ proc init*(T: type Eth1Chain, preset: RuntimePreset, db: BeaconChainDB): T =
proc init*(T: type Eth1Monitor, proc init*(T: type Eth1Monitor,
preset: RuntimePreset, preset: RuntimePreset,
db: BeaconChainDB, db: BeaconChainDB,
web3Url: string, web3Urls: seq[string],
depositContractAddress: Eth1Address, depositContractAddress: Eth1Address,
depositContractSnapshot: DepositContractSnapshot, depositContractSnapshot: DepositContractSnapshot,
eth1Network: Option[Eth1Network]): T = eth1Network: Option[Eth1Network]): T =
var web3Url = web3Url doAssert web3Urls.len > 0
fixupWeb3Urls web3Url
var web3Urls = web3Urls
for url in mitems(web3Urls):
fixupWeb3Urls url
putInitialDepositContractSnapshot(db, depositContractSnapshot) putInitialDepositContractSnapshot(db, depositContractSnapshot)
T(state: Initialized, T(state: Initialized,
eth1Chain: Eth1Chain.init(preset, db), eth1Chain: Eth1Chain.init(preset, db),
depositContractAddress: depositContractAddress, depositContractAddress: depositContractAddress,
web3Url: web3Url, web3Urls: web3Urls,
eth1Network: eth1Network, eth1Network: eth1Network,
eth1Progress: newAsyncEvent()) eth1Progress: newAsyncEvent())
@ -1000,12 +1004,15 @@ proc startEth1Syncing(m: Eth1Monitor, delayBeforeStart: Duration) {.async.} =
if delayBeforeStart != ZeroDuration: if delayBeforeStart != ZeroDuration:
await sleepAsync(delayBeforeStart) await sleepAsync(delayBeforeStart)
let web3Url = m.web3Urls[m.startIdx mod m.web3Urls.len]
inc m.startIdx
info "Starting Eth1 deposit contract monitoring", info "Starting Eth1 deposit contract monitoring",
contract = $m.depositContractAddress, url = m.web3Url contract = $m.depositContractAddress, url = web3Url
let dataProviderRes = await Web3DataProvider.new( let dataProviderRes = await Web3DataProvider.new(
m.depositContractAddress, m.depositContractAddress,
m.web3Url) web3Url)
m.dataProvider = dataProviderRes.tryGet() m.dataProvider = dataProviderRes.tryGet()
let web3 = m.dataProvider.web3 let web3 = m.dataProvider.web3
@ -1150,12 +1157,14 @@ when hasGenesisDetection:
proc init*(T: type Eth1Monitor, proc init*(T: type Eth1Monitor,
db: BeaconChainDB, db: BeaconChainDB,
preset: RuntimePreset, preset: RuntimePreset,
web3Url: string, web3Urls: seq[string],
depositContractAddress: Eth1Address, depositContractAddress: Eth1Address,
depositContractDeployedAt: BlockHashOrNumber, depositContractDeployedAt: BlockHashOrNumber,
eth1Network: Option[Eth1Network]): Future[Result[T, string]] {.async.} = eth1Network: Option[Eth1Network]): Future[Result[T, string]] {.async.} =
doAssert web3Urls.len > 0
try: try:
let dataProviderRes = await Web3DataProvider.new(depositContractAddress, web3Url) var urlIdx = 0
let dataProviderRes = await Web3DataProvider.new(depositContractAddress, web3Urls[urlIdx])
if dataProviderRes.isErr: if dataProviderRes.isErr:
return err(dataProviderRes.error) return err(dataProviderRes.error)
var dataProvider = dataProviderRes.get var dataProvider = dataProviderRes.get
@ -1180,8 +1189,10 @@ when hasGenesisDetection:
# Until this is fixed upstream, we'll just try to recreate # Until this is fixed upstream, we'll just try to recreate
# the web3 provider before retrying. In case this fails, # the web3 provider before retrying. In case this fails,
# the Eth1Monitor will be restarted. # the Eth1Monitor will be restarted.
inc urlIdx
dataProvider = tryGet( dataProvider = tryGet(
await Web3DataProvider.new(depositContractAddress, web3Url)) await Web3DataProvider.new(depositContractAddress,
web3Urls[urlIdx mod web3Urls.len]))
blk.hash.asEth2Digest blk.hash.asEth2Digest
let depositContractSnapshot = DepositContractSnapshot( let depositContractSnapshot = DepositContractSnapshot(
@ -1190,7 +1201,7 @@ when hasGenesisDetection:
var monitor = Eth1Monitor.init( var monitor = Eth1Monitor.init(
db, db,
preset, preset,
web3Url, web3Urls,
depositContractAddress, depositContractAddress,
depositContractSnapshot, depositContractSnapshot,
eth1Network) eth1Network)

View File

@ -152,7 +152,7 @@ proc init*(T: type BeaconNode,
# This is a fresh start without a known genesis state # This is a fresh start without a known genesis state
# (most likely, it hasn't arrived yet). We'll try to # (most likely, it hasn't arrived yet). We'll try to
# obtain a genesis through the Eth1 deposits monitor: # obtain a genesis through the Eth1 deposits monitor:
if config.web3Url.len == 0: if config.web3Urls.len == 0:
fatal "Web3 URL not specified" fatal "Web3 URL not specified"
quit 1 quit 1
@ -161,7 +161,7 @@ proc init*(T: type BeaconNode,
let eth1MonitorRes = waitFor Eth1Monitor.init( let eth1MonitorRes = waitFor Eth1Monitor.init(
runtimePreset, runtimePreset,
db, db,
config.web3Url, config.web3Urls,
depositContractAddress, depositContractAddress,
depositContractDeployedAt, depositContractDeployedAt,
eth1Network) eth1Network)
@ -169,7 +169,7 @@ proc init*(T: type BeaconNode,
if eth1MonitorRes.isErr: if eth1MonitorRes.isErr:
fatal "Failed to start Eth1 monitor", fatal "Failed to start Eth1 monitor",
reason = eth1MonitorRes.error, reason = eth1MonitorRes.error,
web3Url = config.web3Url, web3Urls = config.web3Urls,
depositContractAddress, depositContractAddress,
depositContractDeployedAt depositContractDeployedAt
quit 1 quit 1
@ -271,14 +271,14 @@ proc init*(T: type BeaconNode,
chainDag.setTailState(checkpointState[], checkpointBlock) chainDag.setTailState(checkpointState[], checkpointBlock)
if eth1Monitor.isNil and if eth1Monitor.isNil and
config.web3Url.len > 0 and config.web3Urls.len > 0 and
genesisDepositsSnapshotContents.len > 0: genesisDepositsSnapshotContents.len > 0:
let genesisDepositsSnapshot = SSZ.decode(genesisDepositsSnapshotContents, let genesisDepositsSnapshot = SSZ.decode(genesisDepositsSnapshotContents,
DepositContractSnapshot) DepositContractSnapshot)
eth1Monitor = Eth1Monitor.init( eth1Monitor = Eth1Monitor.init(
runtimePreset, runtimePreset,
db, db,
config.web3Url, config.web3Urls,
depositContractAddress, depositContractAddress,
genesisDepositsSnapshot, genesisDepositsSnapshot,
eth1Network) eth1Network)
@ -1702,8 +1702,8 @@ proc doCreateTestnet(config: BeaconNodeConf, rng: var BrHmacDrbgContext) {.raise
let let
startTime = uint64(times.toUnix(times.getTime()) + config.genesisOffset) startTime = uint64(times.toUnix(times.getTime()) + config.genesisOffset)
outGenesis = config.outputGenesis.string outGenesis = config.outputGenesis.string
eth1Hash = if config.web3Url.len == 0: eth1BlockHash eth1Hash = if config.web3Urls.len == 0: eth1BlockHash
else: (waitFor getEth1BlockHash(config.web3Url, blockId("latest"))).asEth2Digest else: (waitFor getEth1BlockHash(config.web3Urls[0], blockId("latest"))).asEth2Digest
runtimePreset = getRuntimePresetForNetwork(config.eth2Network) runtimePreset = getRuntimePresetForNetwork(config.eth2Network)
var var
initialState = initialize_beacon_state_from_eth1( initialState = initialize_beacon_state_from_eth1(

2
vendor/news vendored

@ -1 +1 @@
Subproject commit 363dd4ca77582310f55d98569b9fd2a35678f17d Subproject commit 002b21b49226473cbe4d6cc67e9d836c340babc3

2
vendor/nim-json-rpc vendored

@ -1 +1 @@
Subproject commit 64d40d6c1a095761a03d1ba55eb45877596e8e7b Subproject commit 7a9d118929483c38f67df81514011414e229cd66