Better connection error handling in MainchainMonitor

This commit is contained in:
Yuriy Glukhov 2019-11-22 15:16:07 +02:00 committed by zah
parent ea241e407d
commit 2e875ea17e
4 changed files with 72 additions and 27 deletions

View File

@ -1,12 +1,11 @@
import
chronos, web3, json,
chronos, web3, json, chronicles,
spec/[datatypes, digest, crypto, beaconstate, helpers],
./extras
type
MainchainMonitor* = ref object
web3Url: string
web3: Web3
depositContractAddress: Address
genesisState: ref BeaconState
@ -21,6 +20,8 @@ type
eth1Block: BlockHash
eth1Data*: Eth1Data
runFut: Future[void]
QueueElement = (BlockHash, DepositData)
@ -39,21 +40,34 @@ contract(DepositContract):
const MIN_GENESIS_TIME = 0
proc updateEth1Data*(m: MainchainMonitor) {.async.} =
let ns = m.web3.contractSender(DepositContract, m.depositContractAddress)
# TODO: use m.eth1Block for web3 calls
let cnt = await ns.get_deposit_count().call()
let htr = await ns.get_deposit_root().call()
m.eth1Data.deposit_count = bytes_to_int(array[8, byte](cnt))
m.eth1Data.deposit_root.data = array[32, byte](htr)
proc updateEth1Data(m: MainchainMonitor, count: uint64, root: FixedBytes[32]) =
m.eth1Data.deposit_count = count
m.eth1Data.deposit_root.data = array[32, byte](root)
m.eth1Data.block_hash.data = array[32, byte](m.eth1Block)
proc processDeposits(m: MainchainMonitor) {.async.} =
proc processDeposits(m: MainchainMonitor, web3: Web3) {.async.} =
while true:
let (blkHash, data) = await m.depositQueue.popFirst()
var blk: BlockObject
var depositCount: uint64
var depositRoot: FixedBytes[32]
try:
blk = await web3.provider.eth_getBlockByHash(blkHash, false)
let ns = web3.contractSender(DepositContract, m.depositContractAddress)
# TODO: use m.eth1Block for web3 calls
let cnt = await ns.get_deposit_count().call()
depositRoot = await ns.get_deposit_root().call()
depositCount = bytes_to_int(array[8, byte](cnt))
except:
# Connection problem? Put the unprocessed deposit back to queue
m.depositQueue.addFirstNoWait((blkHash, data))
raise
debug "Got deposit from eth1", pubKey = data.pubKey
let blk = await m.web3.provider.eth_getBlockByHash(blkHash, false)
let dep = datatypes.Deposit(data: data)
m.pendingDeposits.add(dep)
inc m.depositCount
@ -77,13 +91,13 @@ proc processDeposits(m: MainchainMonitor) {.async.} =
m.genesisStateFut.complete()
m.genesisStateFut = nil
# TODO: Set curBlock to blk number
# TODO: This should be progressing in more independent way.
# The Eth1 cross-link can advance even when there are no new deposits.
await m.updateEth1Data
m.updateEth1Data(depositCount, depositRoot)
proc isRunning*(m: MainchainMonitor): bool =
not m.web3.isNil
not m.runFut.isNil
proc getGenesis*(m: MainchainMonitor): Future[BeaconState] {.async.} =
if m.genesisState.isNil:
@ -95,11 +109,29 @@ proc getGenesis*(m: MainchainMonitor): Future[BeaconState] {.async.} =
doAssert(not m.genesisState.isNil)
return m.genesisState[]
proc run(m: MainchainMonitor) {.async.} =
m.web3 = await newWeb3(m.web3Url)
let ns = m.web3.contractSender(DepositContract, m.depositContractAddress)
proc getBlockNumber(web3: Web3, hash: BlockHash): Future[Quantity] {.async.} =
let blk = await web3.provider.eth_getBlockByHash(hash, false)
return blk.number
let s = await ns.subscribe(DepositEvent, %*{"fromBlock": m.eth1Block}) do(pubkey: Bytes48, withdrawalCredentials: Bytes32, amount: Bytes8, signature: Bytes96, merkleTreeIndex: Bytes8, j: JsonNode):
proc run(m: MainchainMonitor, delayBeforeStart: Duration) {.async.} =
if delayBeforeStart != ZeroDuration:
await sleepAsync(delayBeforeStart)
let web3 = await newWeb3(m.web3Url)
defer: await web3.close()
let processFut = m.processDeposits(web3)
web3.onDisconnect = proc() =
error "Web3 server disconnected", ulr = m.web3Url
processFut.cancel()
let startBlkNum = await web3.getBlockNumber(m.eth1Block)
debug "Starting eth1 monitor", fromBlock = startBlkNum.uint64
let ns = web3.contractSender(DepositContract, m.depositContractAddress)
let s = await ns.subscribe(DepositEvent, %*{"fromBlock": startBlkNum}) do(pubkey: Bytes48, withdrawalCredentials: Bytes32, amount: Bytes8, signature: Bytes96, merkleTreeIndex: Bytes8, j: JsonNode):
let blkHash = BlockHash.fromHex(j["blockHash"].getStr())
let amount = bytes_to_int(array[8, byte](amount))
@ -111,14 +143,27 @@ proc run(m: MainchainMonitor) {.async.} =
signature: ValidatorSig.init(array[96, byte](signature)))))
try:
await m.processDeposits()
await processFut
finally:
await s.unsubscribe()
# await m.web3.close()
m.web3 = nil
proc start*(m: MainchainMonitor) =
asyncCheck m.run()
proc start(m: MainchainMonitor, delayBeforeStart: Duration) =
if m.runFut.isNil:
let runFut = m.run(delayBeforeStart)
m.runFut = runFut
runFut.addCallback() do(p: pointer):
if runFut.failed and runFut == m.runFut:
error "Mainchain monitor failure, restarting", err = runFut.error.msg
m.runFut = nil
m.start(5.seconds)
proc start*(m: MainchainMonitor) {.inline.} =
m.start(0.seconds)
proc stop*(m: MainchainMonitor) =
if not m.runFut.isNil:
m.runFut.cancel()
m.runFut = nil
proc getPendingDeposits*(m: MainchainMonitor): seq[Deposit] =
# This should be a simple accessor for the reference kept above

2
vendor/nim-chronos vendored

@ -1 +1 @@
Subproject commit 299adfa76f77384f4b3f90cded16e0f6a7e8f2c0
Subproject commit 2518a4161f723405004c3e2a743fa08ec67404dc

2
vendor/nim-json-rpc vendored

@ -1 +1 @@
Subproject commit 9214b095fb0266e5cac44804663343cd8288a84c
Subproject commit b6336cb7257a02a1074aaab7891150f2ecc83fc9

2
vendor/nim-web3 vendored

@ -1 +1 @@
Subproject commit 7bc29e747004b8aa7988eab537029ecab73dcb45
Subproject commit 89d7a0c8fd1eb0f749432bd7136d8f385351c48e