guard node.state (#237)

* fix node.state being used unsafely across async callbacks (fixes #230)
* node.state -> node.stateCache
* fix attestation validation not to use asserts
* re-enable attestation pool tests
* prepare for resizing validator registry
This commit is contained in:
Jacek Sieka 2019-04-06 01:46:07 -06:00 committed by Mamy Ratsimbazafy
parent c53de3e550
commit 9a2f3176f7
4 changed files with 147 additions and 129 deletions

View File

@ -26,9 +26,6 @@ proc onBeaconBlock*(node: BeaconNode, blck: BeaconBlock) {.gcsafe.}
import sync_protocol, request_manager
# #################################################
func shortValidatorKey(node: BeaconNode, validatorIdx: int): string =
($node.state.data.validator_registry[validatorIdx].pubkey)[0..7]
func localValidatorsDir(conf: BeaconNodeConf): string =
conf.dataDir / "validators"
@ -174,12 +171,27 @@ proc init*(T: type BeaconNode, conf: BeaconNodeConf): Future[BeaconNode] {.async
let head = result.blockPool.get(result.db.getHeadBlock().get())
result.state = result.blockPool.loadTailState()
result.justifiedStateCache = result.state
result.stateCache = result.blockPool.loadTailState()
result.justifiedStateCache = result.stateCache
let addressFile = string(conf.dataDir) / "beacon_node.address"
result.network.saveConnectionAddressFile(addressFile)
result.beaconClock = BeaconClock.init(result.state.data)
result.beaconClock = BeaconClock.init(result.stateCache.data)
template withState(
pool: BlockPool, cache: var StateData, blockSlot: BlockSlot, body: untyped): untyped =
## Helper template that updates state to a particular BlockSlot - usage of
## cache is unsafe outside of block.
## TODO async transformations will lead to a race where cache gets updated
## while waiting for future to complete - catch this here somehow?
updateState(pool, cache, blockSlot)
template state(): BeaconState {.inject.} = cache.data
template blck(): BlockRef {.inject.} = cache.blck
template root(): Eth2Digest {.inject.} = cache.root
body
proc connectToNetwork(node: BeaconNode) {.async.} =
let localKeys = ensureNetworkKeys(node.config)
@ -214,46 +226,45 @@ template findIt(s: openarray, predicate: untyped): int =
break
res
proc addLocalValidator(node: BeaconNode, validatorKey: ValidatorPrivKey) =
let pubKey = validatorKey.pubKey()
proc addLocalValidator(
node: BeaconNode, state: BeaconState, privKey: ValidatorPrivKey) =
let pubKey = privKey.pubKey()
let idx = node.state.data.validator_registry.findIt(it.pubKey == pubKey)
let idx = state.validator_registry.findIt(it.pubKey == pubKey)
if idx == -1:
warn "Validator not in registry", pubKey
else:
debug "Attaching validator", validator = shortValidatorKey(node, idx),
idx, pubKey
node.attachedValidators.addLocalValidator(idx, pubKey, validatorKey)
node.attachedValidators.addLocalValidator(pubKey, privKey)
proc addLocalValidators(node: BeaconNode) =
proc addLocalValidators(node: BeaconNode, state: BeaconState) =
for validatorKeyFile in node.config.validators:
node.addLocalValidator validatorKeyFile.load
node.addLocalValidator state, validatorKeyFile.load
for kind, file in walkDir(node.config.localValidatorsDir):
if kind in {pcFile, pcLinkToFile}:
node.addLocalValidator ValidatorPrivKey.init(readFile(file).string)
node.addLocalValidator state, ValidatorPrivKey.init(readFile(file).string)
info "Local validators attached ", count = node.attachedValidators.count
proc getAttachedValidator(node: BeaconNode, idx: int): AttachedValidator =
let validatorKey = node.state.data.validator_registry[idx].pubkey
return node.attachedValidators.getValidator(validatorKey)
proc getAttachedValidator(
node: BeaconNode, state: BeaconState, idx: int): AttachedValidator =
let validatorKey = state.validator_registry[idx].pubkey
node.attachedValidators.getValidator(validatorKey)
proc updateHead(node: BeaconNode, slot: Slot): BlockRef =
# Use head state for attestation resolution below
# TODO do we need to resolve attestations using all available head states?
node.blockPool.updateState(
node.state, BlockSlot(blck: node.blockPool.head, slot: slot))
node.blockPool.withState(
node.stateCache, BlockSlot(blck: node.blockPool.head, slot: slot)):
# Check pending attestations - maybe we found some blocks for them
node.attestationPool.resolve(state)
# Check pending attestations - maybe we found some blocks for them
node.attestationPool.resolve(node.state.data)
# TODO move all of this logic to BlockPool
debug "Preparing for fork choice",
stateRoot = shortLog(node.state.root),
connectedPeers = node.network.connectedPeers,
stateSlot = humaneSlotNum(node.state.data.slot),
stateEpoch = humaneEpochNum(node.state.data.slot.slotToEpoch)
# TODO move all of this logic to BlockPool
debug "Preparing for fork choice",
stateRoot = shortLog(root),
connectedPeers = node.network.connectedPeers,
stateSlot = humaneSlotNum(state.slot),
stateEpoch = humaneEpochNum(state.slot.slotToEpoch)
let
justifiedHead = node.blockPool.latestJustifiedBlock()
@ -261,37 +272,25 @@ proc updateHead(node: BeaconNode, slot: Slot): BlockRef =
# TODO slot number is wrong here, it should be the start of the epoch that
# got finalized:
# https://github.com/ethereum/eth2.0-specs/issues/768
node.blockPool.updateState(
node.justifiedStateCache,
BlockSlot(blck: justifiedHead, slot: justifiedHead.slot))
let newHead = node.blockPool.withState(
node.justifiedStateCache,
BlockSlot(blck: justifiedHead, slot: justifiedHead.slot)):
let newHead = lmdGhost(
node.attestationPool, node.justifiedStateCache.data, justifiedHead)
lmdGhost(node.attestationPool, state, justifiedHead)
info "Fork chosen",
newHeadSlot = humaneSlotNum(newHead.slot),
newHeadEpoch = humaneEpochNum(newHead.slot.slotToEpoch),
newHeadBlockRoot = shortLog(newHead.root)
node.blockPool.updateHead(node.state, newHead)
node.blockPool.updateHead(node.stateCache, newHead)
newHead
proc makeAttestation(node: BeaconNode,
proc sendAttestation(node: BeaconNode,
validator: AttachedValidator,
state: BeaconState,
head: BlockRef,
shard: uint64,
attestationData: AttestationData,
committeeLen: int,
indexInCommittee: int) {.async.} =
# TODO - move that to "updateState"
# Epoch underflow - https://github.com/status-im/nim-beacon-chain/issues/207
doAssert node.state.data.current_justified_epoch != GENESIS_EPOCH - 1,
"Underflow in justified epoch field before making attestation"
let
attestationData = makeAttestationData(state, shard, head.root)
# Careful - after await. node.state (etc) might have changed in async race
validatorSignature = await validator.signAttestation(attestationData)
var aggregationBitfield = BitField.init(committeeLen)
@ -309,7 +308,7 @@ proc makeAttestation(node: BeaconNode,
info "Attestation sent",
attestationData = shortLog(attestationData),
validator = shortValidatorKey(node, validator.idx),
validator = shortLog(validator),
signature = shortLog(validatorSignature)
proc proposeBlock(node: BeaconNode,
@ -334,37 +333,40 @@ proc proposeBlock(node: BeaconNode,
doAssert false, "head slot matches proposal slot (!)"
# return
node.blockPool.updateState(node.state, BlockSlot(blck: head, slot: slot - 1))
# To create a block, we'll first apply a partial block to the state, skipping
# some validations.
let
blockBody = BeaconBlockBody(
randao_reveal: validator.genRandaoReveal(node.state.data, slot),
eth1_data: node.mainchainMonitor.getBeaconBlockRef(),
attestations:
node.attestationPool.getAttestationsForBlock(node.state.data, slot))
var (nroot, nblck) = node.blockPool.withState(
node.stateCache, BlockSlot(blck: head, slot: slot - 1)):
# To create a block, we'll first apply a partial block to the state, skipping
# some validations.
let
blockBody = BeaconBlockBody(
randao_reveal: validator.genRandaoReveal(state, slot),
eth1_data: node.mainchainMonitor.getBeaconBlockRef(),
attestations:
node.attestationPool.getAttestationsForBlock(state, slot))
var
newBlock = BeaconBlock(
slot: slot,
previous_block_root: head.root,
body: blockBody,
signature: ValidatorSig(), # we need the rest of the block first!
)
var
newBlock = BeaconBlock(
slot: slot,
previous_block_root: head.root,
body: blockBody,
signature: ValidatorSig(), # we need the rest of the block first!
)
let ok =
updateState(node.state.data, newBlock, {skipValidation})
doAssert ok # TODO: err, could this fail somehow?
node.state.root = hash_tree_root(node.state.data)
let ok = updateState(state, newBlock, {skipValidation})
doAssert ok # TODO: err, could this fail somehow?
root = hash_tree_root(state)
newBlock.state_root = node.state.root
newBlock.state_root = root
newBlock.signature =
await validator.signBlockProposal(node.state.data.fork, newBlock)
let blockRoot = signed_root(newBlock)
let blockRoot = signed_root(newBlock)
# Careful, state no longer valid after here..
newBlock.signature =
await validator.signBlockProposal(state.fork, slot, blockRoot)
let newBlockRef = node.blockPool.add(node.state, blockRoot, newBlock)
(blockRoot, newBlock)
let newBlockRef = node.blockPool.add(node.stateCache, nroot, nblck)
if newBlockRef == nil:
warn "Unable to add proposed block to block pool",
newBlock = shortLog(newBlock),
@ -374,8 +376,7 @@ proc proposeBlock(node: BeaconNode,
info "Block proposed",
blck = shortLog(newBlock),
blockRoot = shortLog(newBlockRef.root),
validator = shortValidatorKey(node, validator.idx),
idx = validator.idx
validator = shortLog(validator)
node.network.broadcast(topicBeaconBlocks, newBlock)
@ -393,9 +394,9 @@ proc onAttestation(node: BeaconNode, attestation: Attestation) =
# though - maybe we should use the state from the block pointed to by
# the attestation for some of the check? Consider interop with block
# production!
node.blockPool.updateState(node.state,
BlockSlot(blck: node.blockPool.head, slot: node.beaconClock.now().toSlot()))
node.attestationPool.add(node.state.data, attestation)
node.blockPool.withState(node.stateCache,
BlockSlot(blck: node.blockPool.head, slot: node.beaconClock.now().toSlot())):
node.attestationPool.add(state, attestation)
proc onBeaconBlock(node: BeaconNode, blck: BeaconBlock) =
# We received a block but don't know much about it yet - in particular, we
@ -405,7 +406,7 @@ proc onBeaconBlock(node: BeaconNode, blck: BeaconBlock) =
blck = shortLog(blck),
blockRoot = shortLog(blockRoot)
if node.blockPool.add(node.state, blockRoot, blck).isNil:
if node.blockPool.add(node.stateCache, blockRoot, blck).isNil:
return
# The block we received contains attestations, and we might not yet know about
@ -446,21 +447,28 @@ proc handleAttestations(node: BeaconNode, head: BlockRef, slot: Slot) =
attestationHeadRoot = shortLog(attestationHead.root),
attestationSlot = humaneSlotNum(slot)
# Collect data to send before node.stateCache grows stale
var attestations: seq[tuple[
data: AttestationData, committeeLen, indexInCommittee: int,
validator: AttachedValidator]]
# We need to run attestations exactly for the slot that we're attesting to.
# In case blocks went missing, this means advancing past the latest block
# using empty slots as fillers.
node.blockPool.updateState(
node.state, BlockSlot(blck: attestationHead, slot: slot))
node.blockPool.withState(
node.stateCache, BlockSlot(blck: attestationHead, slot: slot)):
for crosslink_committee in get_crosslink_committees_at_slot(state, slot):
for i, validatorIdx in crosslink_committee.committee:
let validator = node.getAttachedValidator(state, validatorIdx)
if validator != nil:
attestations.add (
makeAttestationData(state, crosslink_committee.shard, blck.root),
crosslink_committee.committee.len, i, validator)
for crosslink_committee in get_crosslink_committees_at_slot(
node.state.data, slot):
for i, validatorIdx in crosslink_committee.committee:
let validator = node.getAttachedValidator(validatorIdx)
if validator != nil:
traceAsyncErrors makeAttestation(
node, validator, node.state.data, head,
crosslink_committee.shard,
crosslink_committee.committee.len, i)
for a in attestations:
traceAsyncErrors sendAttestation(
node, a.validator, a.data, a.committeeLen, a.indexInCommittee)
proc handleProposal(node: BeaconNode, head: BlockRef, slot: Slot):
Future[BlockRef] {.async.} =
@ -471,21 +479,18 @@ proc handleProposal(node: BeaconNode, head: BlockRef, slot: Slot):
# proposing for it - basically, we're selecting proposer based on an
# empty slot.. wait for the committee selection to settle, then
# revisit this - we should be able to advance behind
node.blockPool.updateState(node.state, BlockSlot(blck: head, slot: slot))
node.blockPool.withState(node.stateCache, BlockSlot(blck: head, slot: slot)):
let
proposerIdx = get_beacon_proposer_index(state, slot)
validator = node.getAttachedValidator(state, proposerIdx)
let proposerIdx = get_beacon_proposer_index(node.state.data, slot)
let validator = node.getAttachedValidator(proposerIdx)
if validator != nil:
return await proposeBlock(node, validator, head, slot)
if validator != nil:
# TODO:
# Warm-up the proposer earlier to try to obtain previous
# missing blocks if necessary
return await proposeBlock(node, validator, head, slot)
debug "Expecting proposal",
headRoot = shortLog(head.root),
slot = humaneSlotNum(slot),
proposer = shortValidatorKey(node.state.data, proposerIdx)
debug "Expecting proposal",
headRoot = shortLog(head.root),
slot = humaneSlotNum(slot),
proposer = shortLog(state.validator_registry[proposerIdx].pubKey)
return head
@ -674,7 +679,7 @@ proc createPidFile(filename: string) =
gPidFile = filename
addQuitProc proc {.noconv.} = removeFile gPidFile
proc start(node: BeaconNode) =
proc start(node: BeaconNode, headState: BeaconState) =
# TODO: while it's nice to cheat by waiting for connections here, we
# actually need to make this part of normal application flow -
# losing all connections might happen at any time and we should be
@ -685,13 +690,13 @@ proc start(node: BeaconNode) =
slotsSinceFinalization =
int64(node.blockPool.finalizedHead.slot) -
int64(node.beaconClock.now()),
stateSlot = humaneSlotNum(node.state.data.slot),
stateSlot = humaneSlotNum(headState.slot),
SHARD_COUNT,
SLOTS_PER_EPOCH,
SECONDS_PER_SLOT,
SPEC_VERSION
node.addLocalValidators()
node.addLocalValidators(headState)
node.run()
when isMainModule:
@ -766,7 +771,8 @@ when isMainModule:
var node = waitFor BeaconNode.init(config)
# TODO slightly ugly to rely on node.stateCache state here..
if node.nickname != "":
dynamicLogScope(node = node.nickname): node.start()
dynamicLogScope(node = node.nickname): node.start(node.stateCache.data)
else:
node.start()
node.start(node.stateCache.data)

View File

@ -39,7 +39,7 @@ type
mainchainMonitor*: MainchainMonitor
beaconClock*: BeaconClock
state*: StateData ##\
stateCache*: StateData ##\
## State cache object that's used as a scratch pad
## TODO this is pretty dangerous - for example if someone sets it
## to a particular state then does `await`, it might change - prone to
@ -248,7 +248,8 @@ type
ValidatorConnection* = object
AttachedValidator* = ref object
idx*: int # index in the registry
pubKey*: ValidatorPubKey
case kind*: ValidatorKind
of inProcess:
privKey*: ValidatorPrivKey
@ -277,3 +278,5 @@ type
proc userValidatorsRange*(d: NetworkMetadata): HSlice[int, int] =
0 .. d.lastUserValidator.int
proc shortLog*(v: AttachedValidator): string = shortLog(v.pubKey)

View File

@ -491,7 +491,7 @@ proc checkAttestation*(
# Can't submit attestations too quickly
if not (
attestation.data.slot + MIN_ATTESTATION_INCLUSION_DELAY <= stateSlot):
warn("Can't submit attestations too quickly",
warn("Attestation too new",
attestation_slot = humaneSlotNum(attestation.data.slot),
state_slot = humaneSlotNum(stateSlot))
return
@ -549,10 +549,14 @@ proc checkAttestation*(
return
# Attestation must be nonempty!
doAssert anyIt(attestation.aggregation_bitfield.bits, it != 0)
if not anyIt(attestation.aggregation_bitfield.bits, it != 0):
warn("No signature bits")
return
# Custody must be empty (to be removed in phase 1)
doAssert allIt(attestation.custody_bitfield.bits, it == 0)
if not allIt(attestation.custody_bitfield.bits, it == 0):
warn("Custody bits set in phase0")
return
# Get the committee for the specific shard that this attestation is for
let crosslink_committee = mapIt(
@ -561,11 +565,13 @@ proc checkAttestation*(
it.committee)[0]
# Custody bitfield must be a subset of the attestation bitfield
doAssert allIt(0 ..< len(crosslink_committee),
if not get_bitfield_bit(attestation.aggregation_bitfield, it):
not get_bitfield_bit(attestation.custody_bitfield, it)
else:
true)
if not allIt(0 ..< len(crosslink_committee),
if not get_bitfield_bit(attestation.aggregation_bitfield, it):
not get_bitfield_bit(attestation.custody_bitfield, it)
else:
true):
warn("Wrong custody bits set")
return
# Verify aggregate signature
let
@ -580,7 +586,7 @@ proc checkAttestation*(
if skipValidation notin flags:
# Verify that aggregate_signature verifies using the group pubkey.
doAssert bls_verify_multiple(
if not bls_verify_multiple(
@[
bls_aggregate_pubkeys(mapIt(custody_bit_0_participants,
state.validator_registry[it].pubkey)),
@ -596,7 +602,9 @@ proc checkAttestation*(
attestation.aggregate_signature,
get_domain(state.fork, slot_to_epoch(attestation.data.slot),
DOMAIN_ATTESTATION),
)
):
warn("Invalid attestation signature")
return
# Crosslink data root is zero (to be removed in phase 1)
if attestation.data.crosslink_data_root != ZERO_HASH:

View File

@ -1,7 +1,7 @@
import
tables,
chronos,
spec/[datatypes, crypto, helpers], ssz,
chronos, chronicles,
spec/[datatypes, crypto, digest, helpers], ssz,
beacon_node_types
@ -12,24 +12,25 @@ template count*(pool: ValidatorPool): int =
pool.validators.len
proc addLocalValidator*(pool: var ValidatorPool,
idx: int,
pubKey: ValidatorPubKey,
privKey: ValidatorPrivKey) =
let v = AttachedValidator(idx: idx,
let v = AttachedValidator(pubKey: pubKey,
kind: inProcess,
privKey: privKey)
pool.validators[pubKey] = v
info "Local validator attached", pubKey, validator = shortLog(v)
proc getValidator*(pool: ValidatorPool,
validatorKey: ValidatorPubKey): AttachedValidator =
pool.validators.getOrDefault(validatorKey)
proc signBlockProposal*(v: AttachedValidator, fork: Fork,
blck: BeaconBlock): Future[ValidatorSig] {.async.} =
proc signBlockProposal*(v: AttachedValidator, fork: Fork, slot: Slot,
blockRoot: Eth2Digest): Future[ValidatorSig] {.async.} =
if v.kind == inProcess:
await sleepAsync(chronos.milliseconds(1))
result = bls_sign(v.privKey, signed_root(blck).data,
get_domain(fork, slot_to_epoch(blck.slot), DOMAIN_BEACON_BLOCK))
result = bls_sign(v.privKey, blockRoot.data,
get_domain(fork, slot_to_epoch(slot), DOMAIN_BEACON_BLOCK))
else:
# TODO:
# send RPC