start untangling beaconnode (#589)

* Move BeaconNode type to its own file (fewer imports)
* disentangle sync protocol/request manager
* fix some old nimisms
* de-fear some logs
* simplify eth1 data production
* add stack tracing to release builds
* drop release compile flag for testnet
This commit is contained in:
Jacek Sieka 2019-11-25 15:36:25 +01:00 committed by GitHub
parent 27da080c69
commit fd4de5de0f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 115 additions and 134 deletions

View File

@ -1,5 +1,5 @@
import
deques, options, sequtils, tables,
deques, sequtils, tables,
chronicles, stew/bitseqs, json_serialization/std/sets,
./spec/[beaconstate, datatypes, crypto, digest, helpers, validator],
./extras, ./ssz, ./block_pool,

View File

@ -40,6 +40,34 @@ declareCounter beacon_blocks_received,
logScope: topics = "beacnde"
type
BeaconNode = ref object
nickname: string
network: Eth2Node
forkVersion: array[4, byte]
networkIdentity: Eth2NodeIdentity
requestManager: RequestManager
isBootstrapNode: bool
bootstrapNodes: seq[BootstrapAddr]
db: BeaconChainDB
config: BeaconNodeConf
attachedValidators: ValidatorPool
blockPool: BlockPool
attestationPool: AttestationPool
mainchainMonitor: MainchainMonitor
beaconClock: BeaconClock
stateCache: StateData ##\
## State cache object that's used as a scratch pad
## TODO this is pretty dangerous - for example if someone sets it
## to a particular state then does `await`, it might change - prone to
## async races
justifiedStateCache: StateData ##\
## A second state cache that's used during head selection, to avoid
## state replaying.
# TODO Something smarter, so we don't need to keep two full copies, wasteful
proc onBeaconBlock*(node: BeaconNode, blck: BeaconBlock) {.gcsafe.}
func localValidatorsDir(conf: BeaconNodeConf): string =
@ -81,7 +109,7 @@ proc getStateFromSnapshot(node: BeaconNode, state: var BeaconState): bool =
dataDir = conf.dataDir.string, snapshot = snapshotPath
quit 1
else:
error "missing genesis file"
debug "No genesis file in data directory", genesisPath
writeGenesisFile = true
genesisPath = snapshotPath
else:
@ -93,16 +121,19 @@ proc getStateFromSnapshot(node: BeaconNode, state: var BeaconState): bool =
try:
state = SSZ.decode(snapshotContents, BeaconState)
except SerializationError as err:
except SerializationError:
error "Failed to import genesis file", path = genesisPath
quit 1
info "Loaded genesis state", path = genesisPath
if writeGenesisFile:
try:
error "writing genesis file", path = conf.dataDir/genesisFile
notice "Writing genesis to data directory", path = conf.dataDir/genesisFile
writeFile(conf.dataDir/genesisFile, snapshotContents.string)
except CatchableError as err:
error "Failed to persist genesis file to data dir", err = err.msg
error "Failed to persist genesis file to data dir",
err = err.msg, genesisFile = conf.dataDir/genesisFile
quit 1
result = true
@ -131,16 +162,11 @@ proc useBootstrapFile(node: BeaconNode, bootstrapFile: string) =
proc init*(T: type BeaconNode, conf: BeaconNodeConf): Future[BeaconNode] {.async.} =
new result
result.onBeaconBlock = onBeaconBlock
result.config = conf
result.networkIdentity = getPersistentNetIdentity(conf)
result.nickname = if conf.nodeName == "auto": shortForm(result.networkIdentity)
else: conf.nodeName
template fail(args: varargs[untyped]) =
stderr.write args, "\n"
quit 1
for bootNode in conf.bootstrapNodes:
result.addBootstrapNode BootstrapAddr.init(bootNode)
@ -164,22 +190,25 @@ proc init*(T: type BeaconNode, conf: BeaconNodeConf): Future[BeaconNode] {.async
var eth1MonitorStartBlock: Eth2Digest
if result.db.getHeadBlock().isNone():
var state: BeaconState
if not result.getStateFromSnapshot(state):
var state = new BeaconState
# TODO getStateFromSnapshot never returns false - it quits..
if not result.getStateFromSnapshot(state[]):
if conf.depositWeb3Url.len != 0:
result.mainchainMonitor = MainchainMonitor.init(conf.depositWeb3Url, conf.depositContractAddress, eth1MonitorStartBlock)
result.mainchainMonitor = MainchainMonitor.init(
conf.depositWeb3Url, conf.depositContractAddress, eth1MonitorStartBlock)
result.mainchainMonitor.start()
else:
stderr.write "No state snapshot (or web3 URL) provided\n"
quit 1
state = await result.mainchainMonitor.getGenesis()
state[] = await result.mainchainMonitor.getGenesis()
else:
eth1MonitorStartBlock = state.eth1Data.block_hash
result.commitGenesisState(state)
result.commitGenesisState(state[])
if result.mainchainMonitor.isNil and conf.depositWeb3Url.len != 0:
result.mainchainMonitor = MainchainMonitor.init(conf.depositWeb3Url, conf.depositContractAddress, eth1MonitorStartBlock)
result.mainchainMonitor = MainchainMonitor.init(
conf.depositWeb3Url, conf.depositContractAddress, eth1MonitorStartBlock)
result.mainchainMonitor.start()
result.blockPool = BlockPool.init(result.db)
@ -191,8 +220,9 @@ proc init*(T: type BeaconNode, conf: BeaconNodeConf): Future[BeaconNode] {.async
# TODO sync is called when a remote peer is connected - is that the right
# time to do so?
let sync = result.network.protocolState(BeaconSync)
sync.node = result
sync.db = result.db
sync.init(
result.blockPool, result.forkVersion,
proc(blck: BeaconBlock) = onBeaconBlock(result, blck))
result.stateCache = result.blockPool.loadTailState()
result.justifiedStateCache = result.stateCache
@ -353,24 +383,19 @@ proc proposeBlock(node: BeaconNode,
doAssert false, "head slot matches proposal slot (!)"
# return
# Get eth1data which may be async
# TODO it's a bad idea to get eth1data async because that might delay block
# production
let (eth1data, deposits) = node.blockPool.withState(
node.stateCache, BlockSlot(blck: head, slot: slot - 1)):
if node.mainchainMonitor.isNil:
let e1d =
get_eth1data_stub(
state.eth1_deposit_index, slot.compute_epoch_at_slot())
(e1d, newSeq[Deposit]())
else:
let e1d = node.mainchainMonitor.eth1Data
(e1d, node.mainchainMonitor.getPendingDeposits())
# Advance state to the slot immediately preceding the one we're creating a
# block for - potentially we will be processing empty slots along the way.
let (nroot, nblck) = node.blockPool.withState(
node.stateCache, BlockSlot(blck: head, slot: slot - 1)):
let (eth1data, deposits) =
if node.mainchainMonitor.isNil:
(get_eth1data_stub(
state.eth1_deposit_index, slot.compute_epoch_at_slot()),
newSeq[Deposit]())
else:
(node.mainchainMonitor.eth1Data,
node.mainchainMonitor.getPendingDeposits())
# To create a block, we'll first apply a partial block to the state, skipping
# some validations.
let

View File

@ -1,45 +1,10 @@
import
sets, deques, tables, options,
deques, tables, options,
stew/[endians2],
spec/[datatypes, crypto, digest],
beacon_chain_db, conf, mainchain_monitor, eth2_network, time
beacon_chain_db
type
# #############################################
#
# Beacon Node
#
# #############################################
BeaconNode* = ref object
nickname*: string
network*: Eth2Node
forkVersion*: array[4, byte]
networkIdentity*: Eth2NodeIdentity
requestManager*: RequestManager
isBootstrapNode*: bool
bootstrapNodes*: seq[BootstrapAddr]
db*: BeaconChainDB
config*: BeaconNodeConf
attachedValidators*: ValidatorPool
blockPool*: BlockPool
attestationPool*: AttestationPool
mainchainMonitor*: MainchainMonitor
beaconClock*: BeaconClock
onBeaconBlock*: proc (node: BeaconNode, blck: BeaconBlock) {.gcsafe.}
stateCache*: StateData ##\
## State cache object that's used as a scratch pad
## TODO this is pretty dangerous - for example if someone sets it
## to a particular state then does `await`, it might change - prone to
## async races
justifiedStateCache*: StateData ##\
## A second state cache that's used during head selection, to avoid
## state replaying.
# TODO Something smarter, so we don't need to keep two full copies, wasteful
# #############################################
#
# Attestation Pool
@ -238,9 +203,6 @@ type
ValidatorPool* = object
validators*: Table[ValidatorPubKey, AttachedValidator]
RequestManager* = object
network*: Eth2Node
FetchRecord* = object
root*: Eth2Digest
historySlots*: uint64

View File

@ -1,4 +0,0 @@
import
spec/datatypes, beacon_node_types
proc onBeaconBlock*(node: BeaconNode, blck: BeaconBlock) {.gcsafe.}

View File

@ -73,12 +73,15 @@ proc processDeposits(m: MainchainMonitor, web3: Web3) {.async.} =
inc m.depositCount
m.eth1Block = blkHash
if m.pendingDeposits.len >= SLOTS_PER_EPOCH and m.pendingDeposits.len >= MIN_GENESIS_ACTIVE_VALIDATOR_COUNT and blk.timestamp.uint64 >= MIN_GENESIS_TIME.uint64:
if m.pendingDeposits.len >= SLOTS_PER_EPOCH and
m.pendingDeposits.len >= MIN_GENESIS_ACTIVE_VALIDATOR_COUNT and
blk.timestamp.uint64 >= MIN_GENESIS_TIME.uint64:
# This block is a genesis candidate
var h: Eth2Digest
h.data = array[32, byte](blkHash)
let startTime = blk.timestamp.uint64
var s = initialize_beacon_state_from_eth1(h, startTime, m.pendingDeposits, {skipValidation})
var s = initialize_beacon_state_from_eth1(
h, startTime, m.pendingDeposits, {skipValidation})
if is_valid_genesis_state(s):
# https://github.com/ethereum/eth2.0-pm/tree/6e41fcf383ebeb5125938850d8e9b4e9888389b4/interop/mocked_start#create-genesis-state
@ -131,7 +134,11 @@ proc run(m: MainchainMonitor, delayBeforeStart: Duration) {.async.} =
let ns = web3.contractSender(DepositContract, m.depositContractAddress)
let s = await ns.subscribe(DepositEvent, %*{"fromBlock": startBlkNum}) do(pubkey: Bytes48, withdrawalCredentials: Bytes32, amount: Bytes8, signature: Bytes96, merkleTreeIndex: Bytes8, j: JsonNode):
let s = await ns.subscribe(DepositEvent, %*{"fromBlock": startBlkNum}) do(
pubkey: Bytes48,
withdrawalCredentials: Bytes32,
amount: Bytes8,
signature: Bytes96, merkleTreeIndex: Bytes8, j: JsonNode):
let blkHash = BlockHash.fromHex(j["blockHash"].getStr())
let amount = bytes_to_int(array[8, byte](amount))

View File

@ -5,6 +5,10 @@ import
eth2_network, beacon_node_types, sync_protocol,
eth/async_utils
type
RequestManager* = object
network*: Eth2Node
proc init*(T: type RequestManager, network: Eth2Node): T =
T(network: network)

View File

@ -395,7 +395,7 @@ func getFinalHash*(merkelizer: SszChunksMerkelizer): Eth2Digest =
let HashingStreamVTable = OutputStreamVTable(
writePage: proc (s: OutputStreamVar, data: openarray[byte])
{.nimcall, gcsafe, raises: [IOError, Defect].} =
{.nimcall, gcsafe, raises: [IOError].} =
trs "ADDING STREAM CHUNK ", data
SszChunksMerkelizer(s.outputDevice).addChunk(data)
,
@ -407,9 +407,8 @@ func getVtableAddresWithoutSideEffect: ptr OutputStreamVTable =
# TODO this is a work-around for the somewhat broken side
# effects analysis of Nim - reading from global let variables
# is considered a side-effect.
# Nim 0.19 doesnt have the `{.noSideEffect.}:` override, so
# we should revisit this in Nim 0.20.2.
{.emit: "`result` = &`HashingStreamVTable`;".}
{.noSideEffect.}:
unsafeAddr HashingStreamVTable
func newSszHashingStream(merkelizer: SszChunksMerkelizer): ref OutputStream =
new result

View File

@ -60,7 +60,7 @@ func indexableNavigatorImpl[T](m: MemRange, idx: int): MemRange =
getMemRange(typedNavigator[idx])
func fieldNavigatorImpl[RecordType; FieldType;
fieldName: static string](m: MemRange): MemRange {.raises: [Defect, MalformedSszError].} =
fieldName: static string](m: MemRange): MemRange {.raises: [MalformedSszError].} =
# TODO: Make sure this doesn't fail with a Defect when
# navigating to an inactive field in a case object.
var typedNavigator = sszMount(m, RecordType)

View File

@ -2,7 +2,7 @@ import
options, tables, sets, macros,
chronicles, chronos, metrics, stew/ranges/bitranges,
spec/[datatypes, crypto, digest, helpers], eth/rlp,
beacon_node_types, eth2_network, beacon_chain_db, block_pool, ssz
beacon_node_types, eth2_network, block_pool, ssz
when networkBackend == rlpxBackend:
import eth/rlp/options as rlpOptions
@ -25,11 +25,11 @@ type
else:
index: uint32
ValidatorSet = seq[Validator]
BeaconBlockCallback* = proc(blck: BeaconBlock) {.gcsafe.}
BeaconSyncNetworkState* = ref object
node*: BeaconNode
db*: BeaconChainDB
blockPool*: BlockPool
forkVersion*: array[4, byte]
onBeaconBlock*: BeaconBlockCallback
BeaconSyncPeerState* = ref object
initialStatusReceived: bool
@ -40,29 +40,18 @@ type
const
MAX_REQUESTED_BLOCKS = 20'u64
MaxAncestorBlocksResponse = 256
func toHeader(b: BeaconBlock): BeaconBlockHeader =
BeaconBlockHeader(
slot: b.slot,
parent_root: b.parent_root,
state_root: b.state_root,
body_root: hash_tree_root(b.body),
signature: b.signature
)
func init*(
v: BeaconSyncNetworkState, blockPool: BlockPool,
forkVersion: array[4, byte], onBeaconBlock: BeaconBlockCallback) =
v.blockPool = blockPool
v.forkVersion = forkVersion
v.onBeaconBlock = onBeaconBlock
proc fromHeaderAndBody(b: var BeaconBlock, h: BeaconBlockHeader, body: BeaconBlockBody) =
doAssert(hash_tree_root(body) == h.body_root)
b.slot = h.slot
b.parent_root = h.parent_root
b.state_root = h.state_root
b.body = body
b.signature = h.signature
proc importBlocks(node: BeaconNode,
blocks: openarray[BeaconBlock]) =
proc importBlocks(state: BeaconSyncNetworkState,
blocks: openarray[BeaconBlock]) {.gcsafe.} =
for blk in blocks:
node.onBeaconBlock(node, blk)
state.onBeaconBlock(blk)
info "Forward sync imported blocks", len = blocks.len
type
@ -73,9 +62,9 @@ type
headRoot*: Eth2Digest
headSlot*: Slot
proc getCurrentStatus(node: BeaconNode): StatusMsg =
proc getCurrentStatus(state: BeaconSyncNetworkState): StatusMsg {.gcsafe.} =
let
blockPool = node.blockPool
blockPool = state.blockPool
finalizedHead = blockPool.finalizedHead
headBlock = blockPool.head.blck
headRoot = headBlock.root
@ -83,14 +72,14 @@ proc getCurrentStatus(node: BeaconNode): StatusMsg =
finalizedEpoch = finalizedHead.slot.compute_epoch_at_slot()
StatusMsg(
fork_version: node.forkVersion,
fork_version: state.forkVersion,
finalizedRoot: finalizedHead.blck.root,
finalizedEpoch: finalizedEpoch,
headRoot: headRoot,
headSlot: headSlot)
proc handleInitialStatus(peer: Peer,
node: BeaconNode,
state: BeaconSyncNetworkState,
ourStatus: StatusMsg,
theirStatus: StatusMsg) {.async, gcsafe.}
@ -102,14 +91,13 @@ p2pProtocol BeaconSync(version = 1,
onPeerConnected do (peer: Peer):
if peer.wasDialed:
let
node = peer.networkState.node
ourStatus = node.getCurrentStatus
ourStatus = peer.networkState.getCurrentStatus()
# TODO: The timeout here is so high only because we fail to
# respond in time due to high CPU load in our single thread.
theirStatus = await peer.status(ourStatus, timeout = 60.seconds)
if theirStatus.isSome:
await peer.handleInitialStatus(node, ourStatus, theirStatus.get)
await peer.handleInitialStatus(peer.networkState, ourStatus, theirStatus.get)
else:
warn "Status response not received in time"
@ -119,14 +107,13 @@ p2pProtocol BeaconSync(version = 1,
requestResponse:
proc status(peer: Peer, theirStatus: StatusMsg) {.libp2pProtocol("status", 1).} =
let
node = peer.networkState.node
ourStatus = node.getCurrentStatus
ourStatus = peer.networkState.getCurrentStatus()
await response.send(ourStatus)
if not peer.state.initialStatusReceived:
peer.state.initialStatusReceived = true
await peer.handleInitialStatus(node, ourStatus, theirStatus)
await peer.handleInitialStatus(peer.networkState, ourStatus, theirStatus)
proc statusResp(peer: Peer, msg: StatusMsg)
@ -144,7 +131,7 @@ p2pProtocol BeaconSync(version = 1,
if count > 0'u64:
let count = if step != 0: min(count, MAX_REQUESTED_BLOCKS.uint64) else: 1
let pool = peer.networkState.node.blockPool
let pool = peer.networkState.blockPool
var results: array[MAX_REQUESTED_BLOCKS, BlockRef]
let
lastPos = min(count.int, results.len) - 1
@ -159,8 +146,7 @@ p2pProtocol BeaconSync(version = 1,
blockRoots: openarray[Eth2Digest]) {.
libp2pProtocol("beacon_blocks_by_root", 1).} =
let
pool = peer.networkState.node.blockPool
db = peer.networkState.db
pool = peer.networkState.blockPool
for root in blockRoots:
let blockRef = pool.getRef(root)
@ -172,11 +158,13 @@ p2pProtocol BeaconSync(version = 1,
blocks: openarray[BeaconBlock])
proc handleInitialStatus(peer: Peer,
node: BeaconNode,
state: BeaconSyncNetworkState,
ourStatus: StatusMsg,
theirStatus: StatusMsg) {.async, gcsafe.} =
if theirStatus.forkVersion != node.forkVersion:
if theirStatus.forkVersion != state.forkVersion:
notice "Irrelevant peer",
peer, theirFork = theirStatus.forkVersion, ourFork = state.forkVersion
await peer.disconnect(IrrelevantNetwork)
return
@ -221,7 +209,7 @@ proc handleInitialStatus(peer: Peer,
info "Got 0 blocks while syncing", peer
break
node.importBlocks blocks.get
state.importBlocks(blocks.get)
let lastSlot = blocks.get[^1].slot
if lastSlot <= s:
info "Slot did not advance during sync", peer
@ -231,7 +219,8 @@ proc handleInitialStatus(peer: Peer,
# TODO: Maybe this shouldn't happen so often.
# The alternative could be watching up a timer here.
let statusResp = await peer.status(node.getCurrentStatus)
let statusResp = await peer.status(state.getCurrentStatus())
if statusResp.isSome:
theirStatus = statusResp.get
else:
@ -245,4 +234,3 @@ proc handleInitialStatus(peer: Peer,
except CatchableError:
warn "Failed to sync with peer", peer, err = getCurrentExceptionMsg()

View File

@ -1,5 +1,5 @@
import
strformat, ospaths, confutils
strformat, os, confutils
type
Command = enum

View File

@ -1,5 +1,5 @@
import
confutils, ospaths, strutils, chronicles, json_serialization,
confutils, os, strutils, chronicles, json_serialization,
nimcrypto/utils,
../beacon_chain/spec/[crypto, datatypes, digest],
../beacon_chain/[ssz]

View File

@ -1,5 +1,5 @@
import
confutils, ospaths, strutils, chronicles, json_serialization,
confutils, os, strutils, chronicles, json_serialization,
../beacon_chain/spec/[crypto, datatypes, digest],
../beacon_chain/[ssz]

View File

@ -1,5 +1,5 @@
import
confutils, ospaths, strutils, chronicles, json_serialization,
confutils, os, strutils, chronicles, json_serialization,
nimcrypto/utils,
../beacon_chain/spec/[crypto, datatypes, digest],
../beacon_chain/[ssz]

View File

@ -56,7 +56,7 @@ cli do (testnetName {.argument.}: string):
dataDirName = testnetName.replace("/", "_")
dataDir = buildDir / "data" / dataDirName
beaconNodeBinary = buildDir / "beacon_node_" & dataDirName
nimFlags = "-d:release --lineTrace:on -d:chronicles_log_level=DEBUG " & getEnv("NIM_PARAMS")
nimFlags = "-d:chronicles_log_level=DEBUG " & getEnv("NIM_PARAMS")
var depositContractOpt = ""
let depositContractFile = testnetDir / depositContractFile