213 lines
7.1 KiB
Nim
213 lines
7.1 KiB
Nim
import
|
|
chronos, web3, json, chronicles,
|
|
spec/[datatypes, digest, crypto, beaconstate, helpers]
|
|
|
|
type
|
|
MainchainMonitor* = ref object
|
|
web3Url: string
|
|
depositContractAddress: Address
|
|
|
|
genesisState: ref BeaconState
|
|
genesisStateFut: Future[void]
|
|
|
|
pendingDeposits: seq[Deposit]
|
|
depositCount: uint64
|
|
|
|
curBlock: uint64
|
|
depositQueue: AsyncQueue[QueueElement]
|
|
|
|
eth1Block: BlockHash
|
|
eth1Data*: Eth1Data
|
|
|
|
runFut: Future[void]
|
|
|
|
QueueElement = (BlockHash, DepositData)
|
|
|
|
proc init*(
|
|
T: type MainchainMonitor,
|
|
web3Url, depositContractAddress: string,
|
|
startBlock: Eth2Digest): T =
|
|
T(
|
|
web3Url: web3Url,
|
|
depositContractAddress: Address.fromHex(depositContractAddress),
|
|
depositQueue: newAsyncQueue[QueueElement](),
|
|
eth1Block: BlockHash(startBlock.data),
|
|
)
|
|
|
|
contract(DepositContract):
|
|
proc deposit(pubkey: Bytes48, withdrawalCredentials: Bytes32, signature: Bytes96, deposit_data_root: FixedBytes[32])
|
|
proc get_deposit_root(): FixedBytes[32]
|
|
proc get_deposit_count(): Bytes8
|
|
proc DepositEvent(pubkey: Bytes48, withdrawalCredentials: Bytes32, amount: Bytes8, signature: Bytes96, index: Bytes8) {.event.}
|
|
|
|
const MIN_GENESIS_TIME = 0
|
|
|
|
proc updateEth1Data(m: MainchainMonitor, count: uint64, root: FixedBytes[32]) =
|
|
m.eth1Data.deposit_count = count
|
|
m.eth1Data.deposit_root.data = array[32, byte](root)
|
|
m.eth1Data.block_hash.data = array[32, byte](m.eth1Block)
|
|
|
|
proc processDeposits(m: MainchainMonitor, web3: Web3) {.async.} =
|
|
while true:
|
|
let (blkHash, data) = await m.depositQueue.popFirst()
|
|
var blk: BlockObject
|
|
var depositCount: uint64
|
|
var depositRoot: FixedBytes[32]
|
|
try:
|
|
blk = await web3.provider.eth_getBlockByHash(blkHash, false)
|
|
|
|
let ns = web3.contractSender(DepositContract, m.depositContractAddress)
|
|
|
|
# TODO: use m.eth1Block for web3 calls
|
|
let cnt = await ns.get_deposit_count().call()
|
|
depositRoot = await ns.get_deposit_root().call()
|
|
depositCount = bytes_to_int(array[8, byte](cnt))
|
|
|
|
except:
|
|
# Connection problem? Put the unprocessed deposit back to queue
|
|
m.depositQueue.addFirstNoWait((blkHash, data))
|
|
raise
|
|
|
|
debug "Got deposit from eth1", pubKey = data.pubKey
|
|
|
|
let dep = datatypes.Deposit(data: data)
|
|
m.pendingDeposits.add(dep)
|
|
inc m.depositCount
|
|
m.eth1Block = blkHash
|
|
|
|
if m.pendingDeposits.len >= SLOTS_PER_EPOCH and
|
|
m.pendingDeposits.len >= MIN_GENESIS_ACTIVE_VALIDATOR_COUNT and
|
|
blk.timestamp.uint64 >= MIN_GENESIS_TIME.uint64:
|
|
# This block is a genesis candidate
|
|
var h: Eth2Digest
|
|
h.data = array[32, byte](blkHash)
|
|
let startTime = blk.timestamp.uint64
|
|
var s = initialize_beacon_state_from_eth1(
|
|
h, startTime, m.pendingDeposits, {})
|
|
|
|
if is_valid_genesis_state(s):
|
|
# https://github.com/ethereum/eth2.0-pm/tree/6e41fcf383ebeb5125938850d8e9b4e9888389b4/interop/mocked_start#create-genesis-state
|
|
s.genesis_time = startTime
|
|
|
|
m.pendingDeposits.setLen(0)
|
|
m.genesisState.new()
|
|
m.genesisState[] = s
|
|
if not m.genesisStateFut.isNil:
|
|
m.genesisStateFut.complete()
|
|
m.genesisStateFut = nil
|
|
# TODO: Set curBlock to blk number
|
|
|
|
# TODO: This should be progressing in more independent way.
|
|
# The Eth1 cross-link can advance even when there are no new deposits.
|
|
m.updateEth1Data(depositCount, depositRoot)
|
|
|
|
proc isRunning*(m: MainchainMonitor): bool =
|
|
not m.runFut.isNil
|
|
|
|
proc getGenesis*(m: MainchainMonitor): Future[BeaconState] {.async.} =
|
|
if m.genesisState.isNil:
|
|
if m.genesisStateFut.isNil:
|
|
m.genesisStateFut = newFuture[void]("getGenesis")
|
|
await m.genesisStateFut
|
|
m.genesisStateFut = nil
|
|
|
|
doAssert(not m.genesisState.isNil)
|
|
return m.genesisState[]
|
|
|
|
proc getBlockNumber(web3: Web3, hash: BlockHash): Future[Quantity] {.async.} =
|
|
debug "Querying block number", hash = $hash
|
|
|
|
try:
|
|
let blk = await web3.provider.eth_getBlockByHash(hash, false)
|
|
return blk.number
|
|
except CatchableError as exc:
|
|
# TODO this doesn't make too much sense really, but what would be a
|
|
# reasonable behavior? no idea - the whole algorithm needs to be
|
|
# rewritten to match the spec.
|
|
notice "Failed to get block number from hash, using current block instead",
|
|
hash = $hash, err = exc.msg
|
|
return await web3.provider.eth_blockNumber()
|
|
|
|
proc run(m: MainchainMonitor, delayBeforeStart: Duration) {.async.} =
|
|
if delayBeforeStart != ZeroDuration:
|
|
await sleepAsync(delayBeforeStart)
|
|
|
|
let web3 = await newWeb3(m.web3Url)
|
|
defer: await web3.close()
|
|
|
|
let processFut = m.processDeposits(web3)
|
|
|
|
web3.onDisconnect = proc() =
|
|
error "Web3 server disconnected", ulr = m.web3Url
|
|
processFut.cancel()
|
|
|
|
# TODO this needs to implement follow distance and the rest of the honest
|
|
# validator spec..
|
|
|
|
let startBlkNum = await web3.getBlockNumber(m.eth1Block)
|
|
|
|
notice "Monitoring eth1 deposits",
|
|
fromBlock = startBlkNum.uint64,
|
|
contract = $m.depositContractAddress,
|
|
url = m.web3Url
|
|
|
|
let ns = web3.contractSender(DepositContract, m.depositContractAddress)
|
|
|
|
let s = await ns.subscribe(DepositEvent, %*{"fromBlock": startBlkNum}) do(
|
|
pubkey: Bytes48,
|
|
withdrawalCredentials: Bytes32,
|
|
amount: Bytes8,
|
|
signature: Bytes96, merkleTreeIndex: Bytes8, j: JsonNode):
|
|
try:
|
|
let blkHash = BlockHash.fromHex(j["blockHash"].getStr())
|
|
let amount = bytes_to_int(array[8, byte](amount))
|
|
|
|
m.depositQueue.addLastNoWait((blkHash,
|
|
DepositData(pubkey: ValidatorPubKey.init(array[48, byte](pubkey)),
|
|
withdrawal_credentials: Eth2Digest(data: array[32, byte](withdrawalCredentials)),
|
|
amount: amount,
|
|
signature: ValidatorSig.init(array[96, byte](signature)))))
|
|
except CatchableError as exc:
|
|
warn "Received invalid deposit", err = exc.msg, j
|
|
|
|
try:
|
|
await processFut
|
|
finally:
|
|
await s.unsubscribe()
|
|
|
|
proc start(m: MainchainMonitor, delayBeforeStart: Duration) =
|
|
if m.runFut.isNil:
|
|
let runFut = m.run(delayBeforeStart)
|
|
m.runFut = runFut
|
|
runFut.addCallback() do(p: pointer):
|
|
if runFut.failed and runFut == m.runFut:
|
|
error "Mainchain monitor failure, restarting", err = runFut.error.msg
|
|
m.runFut = nil
|
|
m.start(5.seconds)
|
|
|
|
proc start*(m: MainchainMonitor) {.inline.} =
|
|
m.start(0.seconds)
|
|
|
|
proc stop*(m: MainchainMonitor) =
|
|
if not m.runFut.isNil:
|
|
m.runFut.cancel()
|
|
m.runFut = nil
|
|
|
|
proc getPendingDeposits*(m: MainchainMonitor): seq[Deposit] =
|
|
# This should be a simple accessor for the reference kept above
|
|
m.pendingDeposits
|
|
|
|
# TODO update after spec change removed Specials
|
|
# iterator getValidatorActions*(m: MainchainMonitor,
|
|
# fromBlock, toBlock: Eth2Digest): SpecialRecord =
|
|
# # It's probably better if this doesn't return a SpecialRecord, but
|
|
# # rather a more readable description of the change that can be packed
|
|
# # in a SpecialRecord by the client of the API.
|
|
# discard
|
|
|
|
proc getLatestEth1BlockHash*(url: string): Future[Eth2Digest] {.async.} =
|
|
let web3 = await newWeb3(url)
|
|
let blk = await web3.provider.eth_getBlockByNumber("latest", false)
|
|
result.data = array[32, byte](blk.hash)
|
|
await web3.close()
|