diff --git a/beacon_chain/mainchain_monitor.nim b/beacon_chain/mainchain_monitor.nim index fae9ee2b7..0d72c8c38 100644 --- a/beacon_chain/mainchain_monitor.nim +++ b/beacon_chain/mainchain_monitor.nim @@ -1,12 +1,11 @@ import - chronos, web3, json, + chronos, web3, json, chronicles, spec/[datatypes, digest, crypto, beaconstate, helpers], ./extras type MainchainMonitor* = ref object web3Url: string - web3: Web3 depositContractAddress: Address genesisState: ref BeaconState @@ -21,6 +20,8 @@ type eth1Block: BlockHash eth1Data*: Eth1Data + runFut: Future[void] + QueueElement = (BlockHash, DepositData) @@ -39,21 +40,34 @@ contract(DepositContract): const MIN_GENESIS_TIME = 0 -proc updateEth1Data*(m: MainchainMonitor) {.async.} = - let ns = m.web3.contractSender(DepositContract, m.depositContractAddress) - - # TODO: use m.eth1Block for web3 calls - let cnt = await ns.get_deposit_count().call() - let htr = await ns.get_deposit_root().call() - m.eth1Data.deposit_count = bytes_to_int(array[8, byte](cnt)) - m.eth1Data.deposit_root.data = array[32, byte](htr) +proc updateEth1Data(m: MainchainMonitor, count: uint64, root: FixedBytes[32]) = + m.eth1Data.deposit_count = count + m.eth1Data.deposit_root.data = array[32, byte](root) m.eth1Data.block_hash.data = array[32, byte](m.eth1Block) -proc processDeposits(m: MainchainMonitor) {.async.} = +proc processDeposits(m: MainchainMonitor, web3: Web3) {.async.} = while true: let (blkHash, data) = await m.depositQueue.popFirst() + var blk: BlockObject + var depositCount: uint64 + var depositRoot: FixedBytes[32] + try: + blk = await web3.provider.eth_getBlockByHash(blkHash, false) + + let ns = web3.contractSender(DepositContract, m.depositContractAddress) + + # TODO: use m.eth1Block for web3 calls + let cnt = await ns.get_deposit_count().call() + depositRoot = await ns.get_deposit_root().call() + depositCount = bytes_to_int(array[8, byte](cnt)) + + except: + # Connection problem? Put the unprocessed deposit back to queue + m.depositQueue.addFirstNoWait((blkHash, data)) + raise + + debug "Got deposit from eth1", pubKey = data.pubKey - let blk = await m.web3.provider.eth_getBlockByHash(blkHash, false) let dep = datatypes.Deposit(data: data) m.pendingDeposits.add(dep) inc m.depositCount @@ -77,13 +91,13 @@ proc processDeposits(m: MainchainMonitor) {.async.} = m.genesisStateFut.complete() m.genesisStateFut = nil # TODO: Set curBlock to blk number - + # TODO: This should be progressing in more independent way. # The Eth1 cross-link can advance even when there are no new deposits. - await m.updateEth1Data + m.updateEth1Data(depositCount, depositRoot) proc isRunning*(m: MainchainMonitor): bool = - not m.web3.isNil + not m.runFut.isNil proc getGenesis*(m: MainchainMonitor): Future[BeaconState] {.async.} = if m.genesisState.isNil: @@ -95,11 +109,29 @@ proc getGenesis*(m: MainchainMonitor): Future[BeaconState] {.async.} = doAssert(not m.genesisState.isNil) return m.genesisState[] -proc run(m: MainchainMonitor) {.async.} = - m.web3 = await newWeb3(m.web3Url) - let ns = m.web3.contractSender(DepositContract, m.depositContractAddress) +proc getBlockNumber(web3: Web3, hash: BlockHash): Future[Quantity] {.async.} = + let blk = await web3.provider.eth_getBlockByHash(hash, false) + return blk.number - let s = await ns.subscribe(DepositEvent, %*{"fromBlock": m.eth1Block}) do(pubkey: Bytes48, withdrawalCredentials: Bytes32, amount: Bytes8, signature: Bytes96, merkleTreeIndex: Bytes8, j: JsonNode): +proc run(m: MainchainMonitor, delayBeforeStart: Duration) {.async.} = + if delayBeforeStart != ZeroDuration: + await sleepAsync(delayBeforeStart) + + let web3 = await newWeb3(m.web3Url) + defer: await web3.close() + + let processFut = m.processDeposits(web3) + + web3.onDisconnect = proc() = + error "Web3 server disconnected", ulr = m.web3Url + processFut.cancel() + + let startBlkNum = await web3.getBlockNumber(m.eth1Block) + debug "Starting eth1 monitor", fromBlock = startBlkNum.uint64 + + let ns = web3.contractSender(DepositContract, m.depositContractAddress) + + let s = await ns.subscribe(DepositEvent, %*{"fromBlock": startBlkNum}) do(pubkey: Bytes48, withdrawalCredentials: Bytes32, amount: Bytes8, signature: Bytes96, merkleTreeIndex: Bytes8, j: JsonNode): let blkHash = BlockHash.fromHex(j["blockHash"].getStr()) let amount = bytes_to_int(array[8, byte](amount)) @@ -111,14 +143,27 @@ proc run(m: MainchainMonitor) {.async.} = signature: ValidatorSig.init(array[96, byte](signature))))) try: - await m.processDeposits() + await processFut finally: await s.unsubscribe() - # await m.web3.close() - m.web3 = nil -proc start*(m: MainchainMonitor) = - asyncCheck m.run() +proc start(m: MainchainMonitor, delayBeforeStart: Duration) = + if m.runFut.isNil: + let runFut = m.run(delayBeforeStart) + m.runFut = runFut + runFut.addCallback() do(p: pointer): + if runFut.failed and runFut == m.runFut: + error "Mainchain monitor failure, restarting", err = runFut.error.msg + m.runFut = nil + m.start(5.seconds) + +proc start*(m: MainchainMonitor) {.inline.} = + m.start(0.seconds) + +proc stop*(m: MainchainMonitor) = + if not m.runFut.isNil: + m.runFut.cancel() + m.runFut = nil proc getPendingDeposits*(m: MainchainMonitor): seq[Deposit] = # This should be a simple accessor for the reference kept above diff --git a/vendor/nim-chronos b/vendor/nim-chronos index 299adfa76..2518a4161 160000 --- a/vendor/nim-chronos +++ b/vendor/nim-chronos @@ -1 +1 @@ -Subproject commit 299adfa76f77384f4b3f90cded16e0f6a7e8f2c0 +Subproject commit 2518a4161f723405004c3e2a743fa08ec67404dc diff --git a/vendor/nim-json-rpc b/vendor/nim-json-rpc index 9214b095f..b6336cb72 160000 --- a/vendor/nim-json-rpc +++ b/vendor/nim-json-rpc @@ -1 +1 @@ -Subproject commit 9214b095fb0266e5cac44804663343cd8288a84c +Subproject commit b6336cb7257a02a1074aaab7891150f2ecc83fc9 diff --git a/vendor/nim-web3 b/vendor/nim-web3 index 7bc29e747..89d7a0c8f 160000 --- a/vendor/nim-web3 +++ b/vendor/nim-web3 @@ -1 +1 @@ -Subproject commit 7bc29e747004b8aa7988eab537029ecab73dcb45 +Subproject commit 89d7a0c8fd1eb0f749432bd7136d8f385351c48e