Address #1584 Don't keep all deposits in memory (persist them to disk)
This commit is contained in:
parent
bc8acdb9de
commit
e6320e5881
|
@ -1,13 +1,24 @@
|
|||
{.push raises: [Defect].}
|
||||
|
||||
import
|
||||
typetraits, stew/[results, objects, endians2],
|
||||
typetraits,
|
||||
stew/[results, objects, endians2, io2],
|
||||
serialization, chronicles, snappy,
|
||||
eth/db/kvstore,
|
||||
eth/db/[kvstore, kvstore_sqlite3],
|
||||
./spec/[datatypes, digest, crypto, state_transition],
|
||||
./ssz/[ssz_serialization, merkleization]
|
||||
|
||||
type
|
||||
DbSeq*[T] = object
|
||||
db: SqStoreRef
|
||||
name: string
|
||||
file: File
|
||||
endPos: uint64
|
||||
|
||||
DbMap*[K, V] = object
|
||||
db: SqStoreRef
|
||||
keyspace: int
|
||||
|
||||
BeaconChainDB* = ref object
|
||||
## Database storing resolved blocks and states - resolved blocks are such
|
||||
## blocks that form a chain back to the tail block.
|
||||
|
@ -23,6 +34,15 @@ type
|
|||
## database.
|
||||
backend: KvStoreRef
|
||||
|
||||
deposits*: DbSeq[DepositData]
|
||||
validators*: DbSeq[ImmutableValidatorData]
|
||||
validatorsByKey*: DbMap[ValidatorPubKey, ValidatorIndex]
|
||||
|
||||
Keyspaces* = enum
|
||||
defaultKeyspace = "kvstore"
|
||||
seqMetadata
|
||||
validatorIndexFromPubKey
|
||||
|
||||
DbKeyKind = enum
|
||||
kHashToState
|
||||
kHashToBlock
|
||||
|
@ -39,6 +59,9 @@ type
|
|||
kGenesisBlockRoot
|
||||
## Immutable reference to the network genesis state
|
||||
## (needed for satisfying requests to the beacon node API).
|
||||
kEth1PersistedTo
|
||||
## The latest ETH1 block hash which satisfied the follow distance and
|
||||
## had its deposits persisted to disk.
|
||||
|
||||
const
|
||||
maxDecompressedDbRecordSize = 16*1024*1024
|
||||
|
@ -71,8 +94,91 @@ func subkey(root: Eth2Digest, slot: Slot): array[40, byte] =
|
|||
|
||||
ret
|
||||
|
||||
proc init*(T: type BeaconChainDB, backend: KVStoreRef): BeaconChainDB =
|
||||
T(backend: backend)
|
||||
template panic =
|
||||
# TODO: Could we recover from a corrupted database?
|
||||
# Review all usages.
|
||||
raiseAssert "The database should not be corrupted"
|
||||
|
||||
proc createSeq*(db: SqStoreRef, baseDir, seqFile: string, T: type): DbSeq[T] =
|
||||
var endPos: uint64 = 0
|
||||
proc onData(data: openArray[byte]) =
|
||||
endPos = uint64.fromBytesBE(data)
|
||||
|
||||
discard db.get(
|
||||
ord seqMetadata,
|
||||
seqFile.toOpenArrayByte(0, seqFile.len - 1),
|
||||
onData).expect("working database")
|
||||
|
||||
let f = try: open(baseDir / seqFile, fmWrite)
|
||||
except IOError: panic()
|
||||
|
||||
let fileSize = try: getFileSize(f).uint64
|
||||
except IOError: panic()
|
||||
if endPos > fileSize: panic()
|
||||
|
||||
DbSeq[T](db: db, name: seqFile, file: f, endPos: endPos)
|
||||
|
||||
proc add*[T](s: var DbSeq[T], val: T) =
|
||||
var bytes = SSZ.encode(val)
|
||||
try:
|
||||
setFilePos(s.file, s.endPos.int64)
|
||||
write(s.file, bytes)
|
||||
except IOError:
|
||||
panic()
|
||||
s.endPos += bytes.len.uint64
|
||||
|
||||
proc len*[T](s: DbSeq[T]): uint64 =
|
||||
const elemSize = fixedPortionSize(T).uint64
|
||||
s.endPos div elemSize
|
||||
|
||||
proc get*[T](s: DbSeq[T], idx: uint64): T =
|
||||
const size = uint64 fixedPortionSize(T)
|
||||
var recordBytes: array[size, byte]
|
||||
|
||||
try:
|
||||
# TODO: check for invalid coercion here
|
||||
let pos = size * idx
|
||||
setFilePos(s.file, pos.int64)
|
||||
let bytesRead = readBytes(s.file, recordBytes, 0, size)
|
||||
# TODO Can we recover from a corrupted database?
|
||||
if bytesRead.uint64 != size: panic()
|
||||
except IOError:
|
||||
panic()
|
||||
|
||||
try:
|
||||
decode(SSZ, recordBytes, T)
|
||||
except SerializationError:
|
||||
panic()
|
||||
|
||||
proc flush*(s: DbSeq) =
|
||||
s.file.flushFile()
|
||||
s.db.put(
|
||||
ord seqMetadata,
|
||||
s.name.toOpenArrayByte(0, s.name.len - 1),
|
||||
s.endPos.toBytesBE()).expect("working database")
|
||||
|
||||
proc createMap*(db: SqStoreRef, keyspace: int;
|
||||
K, V: distinct type): DbMap[K, V] =
|
||||
DbMap[K, V](db: db, keyspace: keyspace)
|
||||
|
||||
proc insert*[K, V](m: DbMap[K, V], key: K, value: V) =
|
||||
m.db.put(m.keyspace, SSZ.encode key, SSZ.encode value).expect("working database")
|
||||
|
||||
proc contains*[K, V](m: DbMap[K, V], key: K): bool =
|
||||
contains(m.db, SSZ.encode key).expect("working database")
|
||||
|
||||
proc init*(T: type BeaconChainDB, dir: string): BeaconChainDB =
|
||||
let s = createPath(dir, 0o750)
|
||||
doAssert s.isOk # TODO Handle this in a better way
|
||||
|
||||
let sqliteStore = SqStoreRef.init(dir, "nbc", Keyspaces).expect(
|
||||
"working database")
|
||||
|
||||
T(backend: kvStore sqliteStore,
|
||||
deposits: createSeq(sqliteStore, dir, "deposits", DepositData),
|
||||
validators: createSeq(sqliteStore, dir, "validators", ImmutableValidatorData),
|
||||
validatorsByKey: createMap(sqliteStore, int validatorIndexFromPubKey,
|
||||
ValidatorPubKey, ValidatorIndex))
|
||||
|
||||
proc snappyEncode(inp: openArray[byte]): seq[byte] =
|
||||
try:
|
||||
|
@ -173,6 +279,9 @@ proc putTailBlock*(db: BeaconChainDB, key: Eth2Digest) =
|
|||
proc putGenesisBlockRoot*(db: BeaconChainDB, key: Eth2Digest) =
|
||||
db.put(subkey(kGenesisBlockRoot), key)
|
||||
|
||||
proc putEth1PersistedTo*(db: BeaconChainDB, key: Eth2Digest) =
|
||||
db.put(subkey(kEth1PersistedTo), key)
|
||||
|
||||
proc getBlock*(db: BeaconChainDB, key: Eth2Digest): Opt[TrustedSignedBeaconBlock] =
|
||||
# We only store blocks that we trust in the database
|
||||
result.ok(TrustedSignedBeaconBlock())
|
||||
|
@ -216,7 +325,12 @@ proc getTailBlock*(db: BeaconChainDB): Opt[Eth2Digest] =
|
|||
db.get(subkey(kTailBlock), Eth2Digest)
|
||||
|
||||
proc getGenesisBlockRoot*(db: BeaconChainDB): Eth2Digest =
|
||||
db.get(subkey(kGenesisBlockRoot), Eth2Digest).expect("The database must be seeded with the genesis state")
|
||||
db.get(subkey(kGenesisBlockRoot), Eth2Digest).expect(
|
||||
"The database must be seeded with the genesis state")
|
||||
|
||||
proc getEth1PersistedTo*(db: BeaconChainDB): Eth2Digest =
|
||||
db.get(subkey(kGenesisBlockRoot), Eth2Digest).expect(
|
||||
"The database must be seeded with genesis eth1 data")
|
||||
|
||||
proc containsBlock*(db: BeaconChainDB, key: Eth2Digest): bool =
|
||||
db.backend.contains(subkey(SignedBeaconBlock, key)).expect("working database")
|
||||
|
|
|
@ -77,7 +77,7 @@ proc init*(T: type BeaconNode,
|
|||
netKeys = getPersistentNetKeys(rng[], conf)
|
||||
nickname = if conf.nodeName == "auto": shortForm(netKeys)
|
||||
else: conf.nodeName
|
||||
db = BeaconChainDB.init(kvStore SqStoreRef.init(conf.databaseDir, "nbc").tryGet())
|
||||
db = BeaconChainDB.init(conf.databaseDir)
|
||||
|
||||
var
|
||||
mainchainMonitor: MainchainMonitor
|
||||
|
@ -156,6 +156,7 @@ proc init*(T: type BeaconNode,
|
|||
# TODO Could move this to a separate "GenesisMonitor" process or task
|
||||
# that would do only this - see Paul's proposal for this.
|
||||
mainchainMonitor = MainchainMonitor.init(
|
||||
db,
|
||||
conf.runtimePreset,
|
||||
web3,
|
||||
conf.depositContractAddress.get,
|
||||
|
@ -233,6 +234,7 @@ proc init*(T: type BeaconNode,
|
|||
conf.web3Url.len > 0 and
|
||||
conf.depositContractAddress.isSome:
|
||||
mainchainMonitor = MainchainMonitor.init(
|
||||
db,
|
||||
conf.runtimePreset,
|
||||
web3Provider(conf.web3Url),
|
||||
conf.depositContractAddress.get,
|
||||
|
|
|
@ -2,9 +2,8 @@ import
|
|||
std/[deques, tables, hashes, options, strformat],
|
||||
chronos, web3, web3/ethtypes as web3Types, json, chronicles,
|
||||
eth/common/eth_types, eth/async_utils,
|
||||
spec/[datatypes, digest, crypto, beaconstate, helpers, validator],
|
||||
network_metadata, merkle_minimal,
|
||||
beacon_node_status
|
||||
spec/[datatypes, digest, crypto, beaconstate, helpers, signatures],
|
||||
ssz, beacon_chain_db, network_metadata, merkle_minimal, beacon_node_status
|
||||
|
||||
from times import epochTime
|
||||
|
||||
|
@ -34,12 +33,14 @@ type
|
|||
Eth1BlockTimestamp* = uint64
|
||||
Eth1BlockHeader = web3Types.BlockHeader
|
||||
|
||||
Database* = object
|
||||
|
||||
Eth1Block* = ref object
|
||||
number*: Eth1BlockNumber
|
||||
timestamp*: Eth1BlockTimestamp
|
||||
deposits*: seq[Deposit]
|
||||
voteData*: Eth1Data
|
||||
knownGoodDepositsCount*: Option[uint64]
|
||||
knownValidatorsCount*: Option[uint64]
|
||||
|
||||
Eth1Chain* = object
|
||||
knownStart: Eth1Data
|
||||
|
@ -47,9 +48,10 @@ type
|
|||
|
||||
blocks: Deque[Eth1Block]
|
||||
blocksByHash: Table[BlockHash, Eth1Block]
|
||||
allDeposits*: seq[Deposit]
|
||||
|
||||
MainchainMonitor* = ref object
|
||||
db: BeaconChainDB
|
||||
|
||||
preset: RuntimePreset
|
||||
depositContractAddress: Address
|
||||
dataProviderFactory*: DataProviderFactory
|
||||
|
@ -181,11 +183,6 @@ func trimHeight(eth1Chain: var Eth1Chain, blockNumber: Eth1BlockNumber) =
|
|||
else:
|
||||
break
|
||||
|
||||
if eth1Chain.blocks.len > 0:
|
||||
eth1Chain.allDeposits.setLen(eth1Chain.blocks[^1].voteData.deposit_count)
|
||||
else:
|
||||
eth1Chain.allDeposits.setLen(0)
|
||||
|
||||
func isSuccessorBlock(eth1Chain: Eth1Chain, newBlock: Eth1Block): bool =
|
||||
let currentDepositCount = if eth1Chain.blocks.len == 0:
|
||||
eth1Chain.knownStart.deposit_count
|
||||
|
@ -199,18 +196,12 @@ func isSuccessorBlock(eth1Chain: Eth1Chain, newBlock: Eth1Block): bool =
|
|||
func addSuccessorBlock*(eth1Chain: var Eth1Chain, newBlock: Eth1Block): bool =
|
||||
result = isSuccessorBlock(eth1Chain, newBlock)
|
||||
if result:
|
||||
eth1Chain.allDeposits.add newBlock.deposits
|
||||
reset newBlock.deposits
|
||||
eth1Chain.blocks.addLast newBlock
|
||||
eth1Chain.blocksByHash[newBlock.voteData.block_hash.asBlockHash] = newBlock
|
||||
|
||||
func totalDeposits*(eth1Chain: Eth1Chain): int =
|
||||
for blk in eth1Chain.blocks:
|
||||
result += blk.deposits.len
|
||||
|
||||
func allDeposits*(eth1Chain: Eth1Chain): seq[Deposit] =
|
||||
for blk in eth1Chain.blocks:
|
||||
result.add blk.deposits
|
||||
proc allDepositsUpTo*(m: MainchainMonitor, totalDeposits: uint64): seq[Deposit] =
|
||||
for i in 0'u64 ..< totalDeposits:
|
||||
result.add Deposit(data: m.db.deposits.get(i))
|
||||
|
||||
func clear*(eth1Chain: var Eth1Chain) =
|
||||
eth1Chain = default(Eth1Chain)
|
||||
|
@ -320,7 +311,7 @@ proc getBlockNumber(p: DataProviderRef, hash: BlockHash): Future[Eth1BlockNumber
|
|||
return Eth1BlockNumber(blk.number)
|
||||
except CatchableError as exc:
|
||||
debug "Failed to get Eth1 block number from hash",
|
||||
hash = $hash, err = exc.msg
|
||||
hash = $hash, err = exc.msg
|
||||
raise exc
|
||||
|
||||
template readJsonField(j: JsonNode,
|
||||
|
@ -374,7 +365,7 @@ proc readJsonDeposits(depositsList: JsonNode): seq[Eth1Block] =
|
|||
method fetchDepositData*(p: Web3DataProviderRef,
|
||||
fromBlock, toBlock: Eth1BlockNumber): Future[seq[Eth1Block]]
|
||||
{.async, locks: 0.} =
|
||||
debug "Obtaining deposit log events", fromBlock, toBlock
|
||||
info "Obtaining deposit log events", fromBlock, toBlock
|
||||
return readJsonDeposits(await p.ns.getJsonLogs(DepositEvent,
|
||||
fromBlock = some blockId(fromBlock),
|
||||
toBlock = some blockId(toBlock)))
|
||||
|
@ -413,18 +404,19 @@ method onBlockHeaders*(p: Web3DataProviderRef,
|
|||
if p.blockHeadersSubscription != nil:
|
||||
await p.blockHeadersSubscription.unsubscribe()
|
||||
|
||||
debug "Waiting for new Eth1 block headers"
|
||||
info "Waiting for new Eth1 block headers"
|
||||
|
||||
p.blockHeadersSubscription = await p.web3.subscribeForBlockHeaders(
|
||||
blockHeaderHandler, errorHandler)
|
||||
|
||||
# https://github.com/ethereum/eth2.0-specs/blob/v0.11.1/specs/phase0/validator.md#get_eth1_data
|
||||
func getBlockProposalData*(preset: RuntimePreset, eth1Chain: Eth1Chain,
|
||||
proc getBlockProposalData*(m: MainchainMonitor,
|
||||
state: BeaconState): (Eth1Data, seq[Deposit]) =
|
||||
template voteForNoChange() =
|
||||
return (state.eth1_data, newSeq[Deposit]())
|
||||
result[0] = state.eth1_data
|
||||
return
|
||||
|
||||
let prevBlock = eth1Chain.findBlock(state.eth1_data)
|
||||
let prevBlock = m.eth1Chain.findBlock(state.eth1_data)
|
||||
if prevBlock == nil:
|
||||
# The Eth1 block currently referenced in the BeaconState is unknown to us.
|
||||
# This situation is not specifically covered in the honest validator spec,
|
||||
|
@ -437,83 +429,108 @@ func getBlockProposalData*(preset: RuntimePreset, eth1Chain: Eth1Chain,
|
|||
|
||||
var otherVotesCountTable = initCountTable[Eth1Block]()
|
||||
for vote in state.eth1_data_votes:
|
||||
let eth1Block = eth1Chain.findBlock(vote)
|
||||
if eth1Block != nil and is_candidate_block(preset, eth1Block, periodStart):
|
||||
let eth1Block = m.eth1Chain.findBlock(vote)
|
||||
if eth1Block != nil and is_candidate_block(m.preset, eth1Block, periodStart):
|
||||
otherVotesCountTable.inc eth1Block
|
||||
|
||||
var ourVote: Eth1Block
|
||||
if otherVotesCountTable.len > 0:
|
||||
ourVote = otherVotesCountTable.largest.key
|
||||
let (winningBlock, votes) = otherVotesCountTable.largest
|
||||
result[0] = winningBlock.voteData
|
||||
if uint64((votes + 1) * 2) > SLOTS_PER_ETH1_VOTING_PERIOD:
|
||||
result[1] = m.eth1Chain.getDepositsInRange(prevBlock.number,
|
||||
winningBlock.number)
|
||||
# TODO This can be significantly more optimal
|
||||
var newAllDeposits = m.allDepositsUpTo(winningBlock.voteData.deposit_count)
|
||||
newAllDeposits.add result[1]
|
||||
attachMerkleProofs(newAllDeposits)
|
||||
for i in 0 ..< result[1].len:
|
||||
result[1][i].proof = newAllDeposits[newAllDeposits.len - result[1].len + i].proof
|
||||
else:
|
||||
ourVote = eth1Chain.latestCandidateBlock(preset, periodStart)
|
||||
if ourVote == nil:
|
||||
let latestBlock = m.eth1Chain.latestCandidateBlock(m.preset, periodStart)
|
||||
if latestBlock == nil:
|
||||
voteForNoChange()
|
||||
|
||||
(ourVote.voteData, eth1Chain.getDepositsInRange(prevBlock.number, ourVote.number))
|
||||
|
||||
template getBlockProposalData*(m: MainchainMonitor, state: BeaconState): untyped =
|
||||
getBlockProposalData(m.preset, m.eth1Chain, state)
|
||||
else:
|
||||
result[0] = latestBlock.voteData
|
||||
return
|
||||
|
||||
proc init*(T: type MainchainMonitor,
|
||||
db: BeaconChainDB,
|
||||
preset: RuntimePreset,
|
||||
dataProviderFactory: DataProviderFactory,
|
||||
depositContractAddress: Eth1Address,
|
||||
startPosition: Eth1Data): T =
|
||||
T(preset: preset,
|
||||
T(db: db,
|
||||
preset: preset,
|
||||
depositQueue: newAsyncQueue[Eth1BlockHeader](),
|
||||
dataProviderFactory: dataProviderFactory,
|
||||
depositContractAddress: Address depositContractAddress,
|
||||
eth1Chain: Eth1Chain(knownStart: startPosition))
|
||||
|
||||
proc isCandidateForGenesis(preset: RuntimePreset,
|
||||
timeNow: float,
|
||||
blk: Eth1Block): bool =
|
||||
let followDistanceInSeconds = uint64(SECONDS_PER_ETH1_BLOCK) * preset.ETH1_FOLLOW_DISTANCE
|
||||
if float(blk.timestamp + followDistanceInSeconds) > timeNow:
|
||||
return false
|
||||
|
||||
if genesis_time_from_eth1_timestamp(preset, blk.timestamp) < preset.MIN_GENESIS_TIME:
|
||||
return false
|
||||
|
||||
if blk.knownGoodDepositsCount.isSome:
|
||||
blk.knownGoodDepositsCount.get >= preset.MIN_GENESIS_ACTIVE_VALIDATOR_COUNT
|
||||
else:
|
||||
blk.voteData.deposit_count >= preset.MIN_GENESIS_ACTIVE_VALIDATOR_COUNT
|
||||
|
||||
proc minGenesisCandidateBlockIdx(m: MainchainMonitor): Option[int]
|
||||
{.raises: [Defect].} =
|
||||
proc persistFinalizedBlocks(m: MainchainMonitor, timeNow: float): tuple[
|
||||
genesisBlock: Eth1Block,
|
||||
previousBlock: Eth1Block
|
||||
] =
|
||||
if m.eth1Chain.blocks.len == 0:
|
||||
return
|
||||
|
||||
let now = epochTime()
|
||||
if not isCandidateForGenesis(m.preset, now, m.eth1Chain.blocks.peekLast):
|
||||
return
|
||||
let followDistanceInSeconds = uint64(SECONDS_PER_ETH1_BLOCK) *
|
||||
m.preset.ETH1_FOLLOW_DISTANCE
|
||||
var prevBlock: Eth1Block
|
||||
|
||||
var candidatePos = m.eth1Chain.blocks.len - 1
|
||||
while candidatePos > 1:
|
||||
if not isCandidateForGenesis(m.preset, now, m.eth1Chain.blocks[candidatePos - 1]):
|
||||
break
|
||||
dec candidatePos
|
||||
# TODO: The DB operations should be executed as a transaction here
|
||||
block: # TODO Begin Transaction
|
||||
while true:
|
||||
let blk = m.eth1Chain.blocks.peekFirst
|
||||
if float(blk.timestamp + followDistanceInSeconds) > timeNow:
|
||||
break
|
||||
|
||||
return some(candidatePos)
|
||||
for deposit in blk.deposits:
|
||||
m.db.deposits.add deposit.data
|
||||
|
||||
proc createBeaconStateAux(preset: RuntimePreset,
|
||||
eth1Block: Eth1Block,
|
||||
deposits: var openarray[Deposit]): BeaconStateRef =
|
||||
if verify_deposit_signature(m.preset, deposit.data):
|
||||
let pubkey = deposit.data.pubkey
|
||||
if pubkey notin m.db.validatorsByKey:
|
||||
let idx = m.db.validators.len
|
||||
m.db.validators.add ImmutableValidatorData(
|
||||
pubkey: pubkey,
|
||||
withdrawal_credentials: deposit.data.withdrawal_credentials)
|
||||
m.db.validatorsByKey.insert(pubkey, ValidatorIndex idx)
|
||||
|
||||
blk.knownValidatorsCount = some m.db.validators.len
|
||||
|
||||
discard m.eth1Chain.blocks.popFirst()
|
||||
m.eth1Chain.blocksByHash.del blk.voteData.block_hash.asBlockHash
|
||||
|
||||
let blockGenesisTime = genesis_time_from_eth1_timestamp(m.preset,
|
||||
blk.timestamp)
|
||||
if blockGenesisTime >= m.preset.MIN_GENESIS_TIME and
|
||||
m.db.validators.len >= m.preset.MIN_GENESIS_ACTIVE_VALIDATOR_COUNT:
|
||||
result = (blk, prevBlock)
|
||||
|
||||
prevBlock = blk
|
||||
|
||||
if prevBlock != nil:
|
||||
# TODO commit transaction
|
||||
m.db.validators.flush()
|
||||
m.db.deposits.flush()
|
||||
m.db.putEth1PersistedTo prevBlock.voteData.block_hash
|
||||
|
||||
# TODO Commit
|
||||
|
||||
proc createGenesisState(m: MainchainMonitor, eth1Block: Eth1Block): BeaconStateRef =
|
||||
notice "Generating genesis state",
|
||||
blockNum = eth1Block.number,
|
||||
blockHash = eth1Block.voteData.block_hash,
|
||||
totalDeposits = eth1Block.voteData.deposit_count,
|
||||
activeValidators = eth1Block.knownValidatorsCount.get
|
||||
|
||||
var deposits = m.allDepositsUpTo(eth1Block.voteData.deposit_count)
|
||||
attachMerkleProofs deposits
|
||||
result = initialize_beacon_state_from_eth1(preset,
|
||||
eth1Block.voteData.block_hash,
|
||||
eth1Block.timestamp.uint64,
|
||||
deposits, {})
|
||||
var cache = StateCache()
|
||||
let activeValidators = count_active_validators(result[], GENESIS_EPOCH, cache)
|
||||
eth1Block.knownGoodDepositsCount = some activeValidators
|
||||
|
||||
proc createBeaconState(m: MainchainMonitor, eth1Block: Eth1Block): BeaconStateRef =
|
||||
createBeaconStateAux(
|
||||
m.preset,
|
||||
eth1Block,
|
||||
m.eth1Chain.allDeposits.toOpenArray(0, int(eth1Block.voteData.deposit_count - 1)))
|
||||
initialize_beacon_state_from_eth1(m.preset,
|
||||
eth1Block.voteData.block_hash,
|
||||
eth1Block.timestamp.uint64,
|
||||
deposits, {})
|
||||
|
||||
proc signalGenesis(m: MainchainMonitor, genesisState: BeaconStateRef) =
|
||||
m.genesisState = genesisState
|
||||
|
@ -590,58 +607,37 @@ proc checkForGenesisLoop(m: MainchainMonitor) {.async.} =
|
|||
return
|
||||
|
||||
try:
|
||||
let genesisCandidateIdx = m.minGenesisCandidateBlockIdx
|
||||
if genesisCandidateIdx.isSome:
|
||||
let
|
||||
genesisCandidateIdx = genesisCandidateIdx.get
|
||||
genesisCandidate = m.eth1Chain.blocks[genesisCandidateIdx]
|
||||
# TODO: check for a stale monitor
|
||||
let
|
||||
now = epochTime()
|
||||
(genesisCandidate, genesisParent) = m.persistFinalizedBlocks(now)
|
||||
|
||||
notice "Generating state for candidate block for genesis",
|
||||
blockNum = genesisCandidate.number,
|
||||
blockHash = genesisCandidate.voteData.block_hash,
|
||||
potentialDeposits = genesisCandidate.voteData.deposit_count
|
||||
if genesisCandidate != nil:
|
||||
# We have a candidate state on our hands, but our current Eth1Chain
|
||||
# may consist only of blocks that have deposits attached to them
|
||||
# while the real genesis may have happened in a block without any
|
||||
# deposits (triggered by MIN_GENESIS_TIME).
|
||||
#
|
||||
# This can happen when the beacon node is launched after the genesis
|
||||
# event. We take a short cut when constructing the initial Eth1Chain
|
||||
# by downloading only deposit log entries. Thus, we'll see all the
|
||||
# blocks with deposits, but not the regular blocks in between.
|
||||
#
|
||||
# We'll handle this special case below by examing whether we are in
|
||||
# this potential scenario and we'll use a fast guessing algorith to
|
||||
# discover the ETh1 block with minimal valid genesis time.
|
||||
if genesisParent != nil:
|
||||
if genesisParent.knownValidatorsCount.get >= m.preset.MIN_GENESIS_ACTIVE_VALIDATOR_COUNT and
|
||||
genesisParent.number - genesisParent.number > 1:
|
||||
let genesisBlock = await m.findGenesisBlockInRange(genesisParent, genesisCandidate)
|
||||
if genesisBlock.number != genesisCandidate.number:
|
||||
m.signalGenesis m.createGenesisState(genesisBlock)
|
||||
return
|
||||
|
||||
let
|
||||
candidateState = m.createBeaconState(genesisCandidate)
|
||||
let candidateState = m.createGenesisState(genesisCandidate)
|
||||
m.signalGenesis candidateState
|
||||
return
|
||||
|
||||
if genesisCandidate.knownGoodDepositsCount.get >= m.preset.MIN_GENESIS_ACTIVE_VALIDATOR_COUNT:
|
||||
# We have a candidate state on our hands, but our current Eth1Chain
|
||||
# may consist only of blocks that have deposits attached to them
|
||||
# while the real genesis may have happened in a block without any
|
||||
# deposits (triggered by MIN_GENESIS_TIME).
|
||||
#
|
||||
# This can happen when the beacon node is launched after the genesis
|
||||
# event. We take a short cut when constructing the initial Eth1Chain
|
||||
# by downloading only deposit log entries. Thus, we'll see all the
|
||||
# blocks with deposits, but not the regular blocks in between.
|
||||
#
|
||||
# We'll handle this special case below by examing whether we are in
|
||||
# this potential scenario and we'll use a fast guessing algorith to
|
||||
# discover the ETh1 block with minimal valid genesis time.
|
||||
if genesisCandidateIdx > 0:
|
||||
let preceedingEth1Block = m.eth1Chain.blocks[genesisCandidateIdx - 1]
|
||||
if preceedingEth1Block.voteData.deposit_root == genesisCandidate.voteData.deposit_root:
|
||||
preceedingEth1Block.knownGoodDepositsCount = genesisCandidate.knownGoodDepositsCount
|
||||
else:
|
||||
discard m.createBeaconState(preceedingEth1Block)
|
||||
|
||||
if preceedingEth1Block.knownGoodDepositsCount.get >= m.preset.MIN_GENESIS_ACTIVE_VALIDATOR_COUNT and
|
||||
genesisCandidate.number - preceedingEth1Block.number > 1:
|
||||
let genesisBlock = await m.findGenesisBlockInRange(preceedingEth1Block, genesisCandidate)
|
||||
if genesisBlock.number != genesisCandidate.number:
|
||||
m.signalGenesis m.createBeaconState(genesisBlock)
|
||||
return
|
||||
|
||||
m.signalGenesis candidateState
|
||||
return
|
||||
else:
|
||||
notice "Eth2 genesis candidate block rejected",
|
||||
`block` = shortLog(genesisCandidate),
|
||||
validDeposits = genesisCandidate.knownGoodDepositsCount.get,
|
||||
needed = m.preset.MIN_GENESIS_ACTIVE_VALIDATOR_COUNT
|
||||
else:
|
||||
# TODO: check for a stale monitor
|
||||
discard
|
||||
except CatchableError as err:
|
||||
debug "Unexpected error in checkForGenesisLoop", err = err.msg
|
||||
|
||||
|
@ -678,15 +674,15 @@ func latestEth1Data(eth1Chain: Eth1Chain): Eth1Data =
|
|||
func knownInvalidDepositsCount(eth1Chain: Eth1Chain): uint64 =
|
||||
for i in countdown(eth1Chain.blocks.len - 1, 0):
|
||||
let blk = eth1Chain.blocks[i]
|
||||
if blk.knownGoodDepositsCount.isSome:
|
||||
return blk.voteData.deposit_count - blk.knownGoodDepositsCount.get
|
||||
if blk.knownValidatorsCount.isSome:
|
||||
return blk.voteData.deposit_count - blk.knownValidatorsCount.get
|
||||
|
||||
return 0
|
||||
|
||||
func maxValidDeposits(eth1Chain: Eth1Chain): uint64 =
|
||||
if eth1Chain.blocks.len > 0:
|
||||
let lastBlock = eth1Chain.blocks[^1]
|
||||
lastBlock.knownGoodDepositsCount.get(
|
||||
lastBlock.knownValidatorsCount.get(
|
||||
lastBlock.voteData.deposit_count - eth1Chain.knownInvalidDepositsCount)
|
||||
else:
|
||||
0
|
||||
|
@ -697,7 +693,7 @@ proc processDeposits(m: MainchainMonitor,
|
|||
# Please note that this code is using a queue to guarantee the
|
||||
# strict serial order of processing of deposits. If we had the
|
||||
# same code embedded in the deposit contracts events handler,
|
||||
# it could easily re-order the steps due to the intruptable
|
||||
# it could easily re-order the steps due to the interruptible
|
||||
# interleaved execution of async code.
|
||||
while true:
|
||||
m.checkIfShouldStopMainchainMonitor()
|
||||
|
@ -734,7 +730,7 @@ proc processDeposits(m: MainchainMonitor,
|
|||
voteData: latestEth1Data)
|
||||
else:
|
||||
template logBlockProcessed(blk) =
|
||||
debug "Eth1 block processed",
|
||||
info "Eth1 block processed",
|
||||
`block` = shortLog(blk), totalDeposits = blk.voteData.deposit_count
|
||||
|
||||
await dataProvider.fetchBlockDetails(eth1Blocks[0])
|
||||
|
|
|
@ -342,6 +342,11 @@ type
|
|||
BeaconStateRef* = ref BeaconState not nil
|
||||
NilableBeaconStateRef* = ref BeaconState
|
||||
|
||||
# Please note that this type is not part of the spec
|
||||
ImmutableValidatorData* = object
|
||||
pubkey*: ValidatorPubKey
|
||||
withdrawal_credentials*: Eth2Digest
|
||||
|
||||
# https://github.com/ethereum/eth2.0-specs/blob/v1.0.0-rc.0/specs/phase0/beacon-chain.md#validator
|
||||
Validator* = object
|
||||
pubkey*: ValidatorPubKey
|
||||
|
|
|
@ -406,7 +406,6 @@ func get_attestation_deltas(
|
|||
let
|
||||
total_balance = get_total_active_balance(state, cache)
|
||||
|
||||
|
||||
get_source_deltas(state, total_balance, rewards, penalties, cache)
|
||||
get_target_deltas(state, total_balance, rewards, penalties, cache)
|
||||
get_head_deltas(state, total_balance, rewards, penalties, cache)
|
||||
|
|
|
@ -23,7 +23,7 @@ import
|
|||
conf, time, validator_pool,
|
||||
attestation_pool, exit_pool, block_pools/[spec_cache, chain_dag, clearance],
|
||||
eth2_network, keystore_management, beacon_node_common, beacon_node_types,
|
||||
nimbus_binary_common, mainchain_monitor, version, ssz/merkleization, interop,
|
||||
nimbus_binary_common, mainchain_monitor, version, ssz/merkleization,
|
||||
attestation_aggregation, sync_manager, sszdump,
|
||||
validator_slashing_protection
|
||||
|
||||
|
@ -199,8 +199,7 @@ proc makeBeaconBlockForHeadAndSlot*(node: BeaconNode,
|
|||
node.chainDag.tmpState, head.atSlot(slot)):
|
||||
let (eth1data, deposits) =
|
||||
if node.mainchainMonitor.isNil:
|
||||
(get_eth1data_stub(state.eth1_deposit_index, slot.compute_epoch_at_slot()),
|
||||
newSeq[Deposit]())
|
||||
(state.eth1_data, newSeq[Deposit]())
|
||||
else:
|
||||
node.mainchainMonitor.getBlockProposalData(state)
|
||||
|
||||
|
|
|
@ -1,13 +1,12 @@
|
|||
import
|
||||
confutils, stats, chronicles, strformat, tables,
|
||||
stew/byteutils,
|
||||
import
|
||||
os, stats, strformat, tables,
|
||||
chronicles, confutils, stew/byteutils, eth/db/kvstore_sqlite3,
|
||||
../beacon_chain/network_metadata,
|
||||
../beacon_chain/[beacon_chain_db, extras],
|
||||
../beacon_chain/block_pools/[chain_dag],
|
||||
../beacon_chain/spec/[crypto, datatypes, digest, helpers,
|
||||
state_transition, presets],
|
||||
../beacon_chain/sszdump, ../research/simutils,
|
||||
eth/db/[kvstore, kvstore_sqlite3]
|
||||
../beacon_chain/sszdump, ../research/simutils
|
||||
|
||||
type Timers = enum
|
||||
tInit = "Initialize DB"
|
||||
|
@ -85,10 +84,8 @@ proc cmdBench(conf: DbConf, runtimePreset: RuntimePreset) =
|
|||
|
||||
echo "Opening database..."
|
||||
let
|
||||
db = BeaconChainDB.init(
|
||||
kvStore SqStoreRef.init(conf.databaseDir.string, "nbc").tryGet())
|
||||
dbBenchmark = BeaconChainDB.init(
|
||||
kvStore SqStoreRef.init(".", "benchmark").tryGet())
|
||||
db = BeaconChainDB.init(conf.databaseDir.string)
|
||||
dbBenchmark = BeaconChainDB.init("benchmark")
|
||||
defer: db.close()
|
||||
|
||||
if not ChainDAGRef.isInitialized(db):
|
||||
|
@ -140,9 +137,7 @@ proc cmdBench(conf: DbConf, runtimePreset: RuntimePreset) =
|
|||
printTimers(false, timers)
|
||||
|
||||
proc cmdDumpState(conf: DbConf) =
|
||||
let
|
||||
db = BeaconChainDB.init(
|
||||
kvStore SqStoreRef.init(conf.databaseDir.string, "nbc").tryGet())
|
||||
let db = BeaconChainDB.init(conf.databaseDir.string)
|
||||
defer: db.close()
|
||||
|
||||
for stateRoot in conf.stateRoot:
|
||||
|
@ -157,9 +152,7 @@ proc cmdDumpState(conf: DbConf) =
|
|||
echo "Couldn't load ", stateRoot, ": ", e.msg
|
||||
|
||||
proc cmdDumpBlock(conf: DbConf) =
|
||||
let
|
||||
db = BeaconChainDB.init(
|
||||
kvStore SqStoreRef.init(conf.databaseDir.string, "nbc").tryGet())
|
||||
let db = BeaconChainDB.init(conf.databaseDir.string)
|
||||
defer: db.close()
|
||||
|
||||
for blockRoot in conf.blockRootx:
|
||||
|
@ -245,10 +238,9 @@ proc copyPrunedDatabase(
|
|||
|
||||
proc cmdPrune(conf: DbConf) =
|
||||
let
|
||||
db = BeaconChainDB.init(
|
||||
kvStore SqStoreRef.init(conf.databaseDir.string, "nbc").tryGet())
|
||||
copyDb = BeaconChainDB.init(
|
||||
kvStore SqStoreRef.init(conf.databaseDir.string, "nbc_pruned").tryGet())
|
||||
db = BeaconChainDB.init(conf.databaseDir.string)
|
||||
# TODO: add the destination as CLI paramter
|
||||
copyDb = BeaconChainDB.init("pruned_db")
|
||||
|
||||
defer:
|
||||
db.close()
|
||||
|
@ -258,9 +250,7 @@ proc cmdPrune(conf: DbConf) =
|
|||
|
||||
proc cmdRewindState(conf: DbConf, runtimePreset: RuntimePreset) =
|
||||
echo "Opening database..."
|
||||
let
|
||||
db = BeaconChainDB.init(
|
||||
kvStore SqStoreRef.init(conf.databaseDir.string, "nbc").tryGet())
|
||||
let db = BeaconChainDB.init(conf.databaseDir.string)
|
||||
defer: db.close()
|
||||
|
||||
if not ChainDAGRef.isInitialized(db):
|
||||
|
|
|
@ -15,9 +15,9 @@
|
|||
# a database, as if a real node was running.
|
||||
|
||||
import
|
||||
confutils, chronicles, stats, times,
|
||||
strformat,
|
||||
options, random, tables,
|
||||
confutils, chronicles, stats, times, strformat,
|
||||
options, random, tables, os,
|
||||
eth/db/kvstore_sqlite3,
|
||||
../tests/[testblockutil],
|
||||
../beacon_chain/spec/[beaconstate, crypto, datatypes, digest, presets,
|
||||
helpers, validator, signatures, state_transition],
|
||||
|
@ -26,7 +26,6 @@ import
|
|||
interop, validator_pool],
|
||||
../beacon_chain/block_pools/[
|
||||
spec_cache, chain_dag, quarantine, clearance],
|
||||
eth/db/[kvstore, kvstore_sqlite3],
|
||||
../beacon_chain/ssz/[merkleization, ssz_serialization],
|
||||
./simutils
|
||||
|
||||
|
@ -50,8 +49,7 @@ cli do(slots = SLOTS_PER_EPOCH * 6,
|
|||
|
||||
echo "Starting simulation..."
|
||||
|
||||
let
|
||||
db = BeaconChainDB.init(kvStore SqStoreRef.init(".", "block_sim").tryGet())
|
||||
let db = BeaconChainDB.init("block_sim_db")
|
||||
defer: db.close()
|
||||
|
||||
ChainDAGRef.preInit(db, state[].data, state[].data, genesisBlock)
|
||||
|
|
|
@ -1 +1 @@
|
|||
Subproject commit de2d43a7e7afb7b094ca251bdbbd58fbf47df031
|
||||
Subproject commit c1037213910ac5ec2156740698d7ecc9dcbfbaeb
|
Loading…
Reference in New Issue