fix some todo (#1645)

* remove some superfluous gcsafes
* remove getTailState (unused)
* don't store old epochrefs in blocks
* document attestation pool a bit
* remove `pcs =` cruft from log
This commit is contained in:
Jacek Sieka 2020-09-14 16:50:03 +02:00 committed by Mamy Ratsimbazafy
parent 804b152d1d
commit 20dd38fdf9
15 changed files with 99 additions and 143 deletions

View File

@ -65,14 +65,17 @@ proc init*(T: type AttestationPool, chainDag: ChainDAGRef, quarantine: Quarantin
forkChoice: forkChoice forkChoice: forkChoice
) )
proc processAttestation( proc addForkChoiceVotes(
pool: var AttestationPool, slot: Slot, participants: HashSet[ValidatorIndex], pool: var AttestationPool, slot: Slot, participants: HashSet[ValidatorIndex],
block_root: Eth2Digest, wallSlot: Slot) = block_root: Eth2Digest, wallSlot: Slot) =
# Add attestation votes to fork choice # Add attestation votes to fork choice
if (let v = pool.forkChoice.on_attestation( if (let v = pool.forkChoice.on_attestation(
pool.chainDag, slot, block_root, participants, wallSlot); pool.chainDag, slot, block_root, participants, wallSlot);
v.isErr): v.isErr):
warn "Couldn't process attestation", err = v.error() # This indicates that the fork choice and the chain dag are out of sync -
# this is most likely the result of a bug, but we'll try to keep going -
# hopefully the fork choice will heal itself over time.
error "Couldn't add attestation to fork choice, bug?", err = v.error()
func candidateIdx(pool: AttestationPool, slot: Slot): Option[uint64] = func candidateIdx(pool: AttestationPool, slot: Slot): Option[uint64] =
if slot >= pool.startingSlot and if slot >= pool.startingSlot and
@ -103,7 +106,15 @@ proc addAttestation*(pool: var AttestationPool,
attestation: Attestation, attestation: Attestation,
participants: HashSet[ValidatorIndex], participants: HashSet[ValidatorIndex],
wallSlot: Slot) = wallSlot: Slot) =
# Add an attestation whose parent we know ## Add an attestation to the pool, assuming it's been validated already.
## Attestations may be either agggregated or not - we're pursuing an eager
## strategy where we'll drop validations we already knew about and combine
## the new attestation with an existing one if possible.
##
## This strategy is not optimal in the sense that it would be possible to find
## a better packing of attestations by delaying the aggregation, but because
## it's possible to include more than one aggregate in a block we can be
## somewhat lazy instead of looking for a perfect packing.
logScope: logScope:
attestation = shortLog(attestation) attestation = shortLog(attestation)
@ -111,7 +122,7 @@ proc addAttestation*(pool: var AttestationPool,
let candidateIdx = pool.candidateIdx(attestation.data.slot) let candidateIdx = pool.candidateIdx(attestation.data.slot)
if candidateIdx.isNone: if candidateIdx.isNone:
debug "Attestation slot out of range", debug "Skipping old attestation for block production",
startingSlot = pool.startingSlot startingSlot = pool.startingSlot
return return
@ -129,11 +140,7 @@ proc addAttestation*(pool: var AttestationPool,
# The validations in the new attestation are a subset of one of the # The validations in the new attestation are a subset of one of the
# attestations that we already have on file - no need to add this # attestations that we already have on file - no need to add this
# attestation to the database # attestation to the database
# TODO what if the new attestation is useful for creating bigger trace "Ignoring subset attestation", newParticipants = participants
# sets by virtue of not overlapping with some other attestation
# and therefore being useful after all?
trace "Ignoring subset attestation",
newParticipants = participants
found = true found = true
break break
@ -141,14 +148,13 @@ proc addAttestation*(pool: var AttestationPool,
# Attestations in the pool that are a subset of the new attestation # Attestations in the pool that are a subset of the new attestation
# can now be removed per same logic as above # can now be removed per same logic as above
trace "Removing subset attestations", trace "Removing subset attestations", newParticipants = participants
newParticipants = participants
a.validations.keepItIf( a.validations.keepItIf(
not it.aggregation_bits.isSubsetOf(validation.aggregation_bits)) not it.aggregation_bits.isSubsetOf(validation.aggregation_bits))
a.validations.add(validation) a.validations.add(validation)
pool.processAttestation( pool.addForkChoiceVotes(
attestation.data.slot, participants, attestation.data.beacon_block_root, attestation.data.slot, participants, attestation.data.beacon_block_root,
wallSlot) wallSlot)
@ -165,7 +171,7 @@ proc addAttestation*(pool: var AttestationPool,
data: attestation.data, data: attestation.data,
validations: @[validation] validations: @[validation]
)) ))
pool.processAttestation( pool.addForkChoiceVotes(
attestation.data.slot, participants, attestation.data.beacon_block_root, attestation.data.slot, participants, attestation.data.beacon_block_root,
wallSlot) wallSlot)
@ -183,18 +189,16 @@ proc addForkChoice*(pool: var AttestationPool,
pool.chainDag, epochRef, blckRef, blck, wallSlot) pool.chainDag, epochRef, blckRef, blck, wallSlot)
if state.isErr: if state.isErr:
# TODO If this happens, it is effectively a bug - the BlockRef structure # This indicates that the fork choice and the chain dag are out of sync -
# guarantees that the DAG is valid and the state transition should # this is most likely the result of a bug, but we'll try to keep going -
# guarantee that the justified and finalized epochs are ok! However, # hopefully the fork choice will heal itself over time.
# we'll log it for now to avoid crashes error "Couldn't add block to fork choice, bug?",
error "Unexpected error when applying block",
blck = shortLog(blck), err = state.error blck = shortLog(blck), err = state.error
proc getAttestationsForSlot*(pool: AttestationPool, newBlockSlot: Slot): proc getAttestationsForSlot*(pool: AttestationPool, newBlockSlot: Slot):
Option[AttestationsSeen] = Option[AttestationsSeen] =
if newBlockSlot < (GENESIS_SLOT + MIN_ATTESTATION_INCLUSION_DELAY): if newBlockSlot < (GENESIS_SLOT + MIN_ATTESTATION_INCLUSION_DELAY):
debug "Too early for attestations", debug "Too early for attestations", newBlockSlot = shortLog(newBlockSlot)
newBlockSlot = shortLog(newBlockSlot)
return none(AttestationsSeen) return none(AttestationsSeen)
let let
@ -210,13 +214,10 @@ proc getAttestationsForSlot*(pool: AttestationPool, newBlockSlot: Slot):
some(pool.candidates[candidateIdx.get()]) some(pool.candidates[candidateIdx.get()])
proc getAttestationsForBlock*(pool: AttestationPool, proc getAttestationsForBlock*(pool: AttestationPool,
state: BeaconState): seq[Attestation] = state: BeaconState,
cache: var StateCache): seq[Attestation] =
## Retrieve attestations that may be added to a new block at the slot of the ## Retrieve attestations that may be added to a new block at the slot of the
## given state ## given state
logScope: pcs = "retrieve_attestation"
# TODO this shouldn't really need state -- it's to recheck/validate, but that
# should be refactored
let newBlockSlot = state.slot let newBlockSlot = state.slot
var attestations: seq[AttestationEntry] var attestations: seq[AttestationEntry]
@ -236,7 +237,6 @@ proc getAttestationsForBlock*(pool: AttestationPool,
if attestations.len == 0: if attestations.len == 0:
return return
var cache = StateCache()
for a in attestations: for a in attestations:
var var
# https://github.com/ethereum/eth2.0-specs/blob/v0.12.2/specs/phase0/validator.md#construct-attestation # https://github.com/ethereum/eth2.0-specs/blob/v0.12.2/specs/phase0/validator.md#construct-attestation
@ -249,14 +249,10 @@ proc getAttestationsForBlock*(pool: AttestationPool,
agg {.noInit.}: AggregateSignature agg {.noInit.}: AggregateSignature
agg.init(a.validations[0].aggregate_signature) agg.init(a.validations[0].aggregate_signature)
# TODO what's going on here is that when producing a block, we need to # Signature verification here is more of a sanity check - it could
# include only such attestations that will not cause block validation # be optimized away, though for now it's easier to reuse the logic from
# to fail. How this interacts with voting and the acceptance of # the state transition function to ensure that the new block will not
# attestations into the pool in general is an open question that needs # fail application.
# revisiting - for example, when attestations are added, against which
# state should they be validated, if at all?
# TODO we're checking signatures here every time which is very slow and we don't want
# to include a broken attestation
if (let v = check_attestation(state, attestation, {}, cache); v.isErr): if (let v = check_attestation(state, attestation, {}, cache); v.isErr):
warn "Attestation no longer validates...", warn "Attestation no longer validates...",
attestation = shortLog(attestation), attestation = shortLog(attestation),
@ -265,13 +261,6 @@ proc getAttestationsForBlock*(pool: AttestationPool,
continue continue
for i in 1..a.validations.high: for i in 1..a.validations.high:
# TODO We need to select a set of attestations that maximise profit by
# adding the largest combined attestation set that we can find - this
# unfortunately looks an awful lot like
# https://en.wikipedia.org/wiki/Set_packing - here we just iterate
# and naively add as much as possible in one go, by we could also
# add the same attestation data twice, as long as there's at least
# one new attestation in there
if not attestation.aggregation_bits.overlaps( if not attestation.aggregation_bits.overlaps(
a.validations[i].aggregation_bits): a.validations[i].aggregation_bits):
attestation.aggregation_bits.combine(a.validations[i].aggregation_bits) attestation.aggregation_bits.combine(a.validations[i].aggregation_bits)
@ -333,4 +322,6 @@ proc selectHead*(pool: var AttestationPool, wallSlot: Slot): BlockRef =
proc prune*(pool: var AttestationPool) = proc prune*(pool: var AttestationPool) =
if (let v = pool.forkChoice.prune(); v.isErr): if (let v = pool.forkChoice.prune(); v.isErr):
error "Pruning failed", err = v.error() # TODO should never happen # If pruning fails, it's likely the result of a bug - this shouldn't happen
# but we'll keep running hoping that the fork chocie will recover eventually
error "Couldn't prune fork choice, bug?", err = v.error()

View File

@ -404,15 +404,12 @@ proc addMessageHandlers(node: BeaconNode): Future[void] =
node.getAttestationHandlers() node.getAttestationHandlers()
) )
proc onSlotStart(node: BeaconNode, lastSlot, scheduledSlot: Slot) {.gcsafe, async.} = proc onSlotStart(node: BeaconNode, lastSlot, scheduledSlot: Slot) {.async.} =
## Called at the beginning of a slot - usually every slot, but sometimes might ## Called at the beginning of a slot - usually every slot, but sometimes might
## skip a few in case we're running late. ## skip a few in case we're running late.
## lastSlot: the last slot that we successfully processed, so we know where to ## lastSlot: the last slot that we successfully processed, so we know where to
## start work from ## start work from
## scheduledSlot: the slot that we were aiming for, in terms of timing ## scheduledSlot: the slot that we were aiming for, in terms of timing
logScope: pcs = "slot_start"
let let
# The slot we should be at, according to the clock # The slot we should be at, according to the clock
beaconTime = node.beaconClock.now() beaconTime = node.beaconClock.now()
@ -598,12 +595,12 @@ proc startSyncManager(node: BeaconNode) =
func getLocalHeadSlot(): Slot = func getLocalHeadSlot(): Slot =
node.chainDag.head.slot node.chainDag.head.slot
proc getLocalWallSlot(): Slot {.gcsafe.} = proc getLocalWallSlot(): Slot =
let epoch = node.beaconClock.now().slotOrZero.compute_epoch_at_slot() + let epoch = node.beaconClock.now().slotOrZero.compute_epoch_at_slot() +
1'u64 1'u64
epoch.compute_start_slot_at_epoch() epoch.compute_start_slot_at_epoch()
func getFirstSlotAtFinalizedEpoch(): Slot {.gcsafe.} = func getFirstSlotAtFinalizedEpoch(): Slot =
let fepoch = node.chainDag.headState.data.data.finalized_checkpoint.epoch let fepoch = node.chainDag.headState.data.data.finalized_checkpoint.epoch
compute_start_slot_at_epoch(fepoch) compute_start_slot_at_epoch(fepoch)
@ -631,7 +628,7 @@ proc currentSlot(node: BeaconNode): Slot =
node.beaconClock.now.slotOrZero node.beaconClock.now.slotOrZero
proc connectedPeersCount(node: BeaconNode): int = proc connectedPeersCount(node: BeaconNode): int =
nbc_peers.value.int len(node.network.peerPool)
proc installBeaconApiHandlers(rpcServer: RpcServer, node: BeaconNode) = proc installBeaconApiHandlers(rpcServer: RpcServer, node: BeaconNode) =
rpcServer.rpc("getBeaconHead") do () -> Slot: rpcServer.rpc("getBeaconHead") do () -> Slot:
@ -893,8 +890,7 @@ proc start(node: BeaconNode) =
SLOTS_PER_EPOCH, SLOTS_PER_EPOCH,
SECONDS_PER_SLOT, SECONDS_PER_SLOT,
SPEC_VERSION, SPEC_VERSION,
dataDir = node.config.dataDir.string, dataDir = node.config.dataDir.string
pcs = "start_beacon_node"
if genesisTime.inFuture: if genesisTime.inFuture:
notice "Waiting for genesis", genesisIn = genesisTime.offset notice "Waiting for genesis", genesisIn = genesisTime.offset
@ -1035,7 +1031,7 @@ when hasPrompt:
when compiles(defaultChroniclesStream.output.writer): when compiles(defaultChroniclesStream.output.writer):
defaultChroniclesStream.output.writer = defaultChroniclesStream.output.writer =
proc (logLevel: LogLevel, msg: LogOutputStr) {.gcsafe, raises: [Defect].} = proc (logLevel: LogLevel, msg: LogOutputStr) {.raises: [Defect].} =
try: try:
# p.hidePrompt # p.hidePrompt
erase statusBar erase statusBar

View File

@ -338,9 +338,6 @@ proc init*(T: type ChainDAGRef,
while cur.blck != nil: while cur.blck != nil:
let root = db.getStateRoot(cur.blck.root, cur.slot) let root = db.getStateRoot(cur.blck.root, cur.slot)
if root.isSome(): if root.isSome():
# TODO load StateData from BeaconChainDB
# We save state root separately for empty slots which means we might
# sometimes not find a state even though we saved its state root
if db.getState(root.get(), tmpState.data.data, noRollback): if db.getState(root.get(), tmpState.data.data, noRollback):
tmpState.data.root = root.get() tmpState.data.root = root.get()
tmpState.blck = cur.blck tmpState.blck = cur.blck
@ -422,24 +419,25 @@ proc getEpochRef*(dag: ChainDAGRef, blck: BlockRef, epoch: Epoch): EpochRef =
newEpochRef = EpochRef.init(state, cache, prevEpochRef) newEpochRef = EpochRef.init(state, cache, prevEpochRef)
# TODO consider constraining the number of epochrefs per state # TODO consider constraining the number of epochrefs per state
ancestor.blck.epochRefs.add newEpochRef if ancestor.blck.slot >= dag.finalizedHead.blck.slot:
newEpochRef.updateKeyStores(blck.parent, dag.finalizedHead.blck) # Only cache epoch information for unfinalized blocks - earlier states
# are seldomly used (ie RPC), so no need to cache
ancestor.blck.epochRefs.add newEpochRef
newEpochRef.updateKeyStores(blck.parent, dag.finalizedHead.blck)
newEpochRef newEpochRef
proc getState( proc getState(
dag: ChainDAGRef, state: var StateData, stateRoot: Eth2Digest, dag: ChainDAGRef, state: var StateData, stateRoot: Eth2Digest,
blck: BlockRef): bool = blck: BlockRef): bool =
let stateAddr = unsafeAddr state # local scope let restoreAddr =
func restore(v: var BeaconState) = # Any restore point will do as long as it's not the object being updated
if stateAddr == (unsafeAddr dag.headState): if unsafeAddr(state) == unsafeAddr(dag.headState):
# TODO seeing the headState in the restore shouldn't happen - we load unsafeAddr dag.tmpState
# head states only when updating the head position, and by that time else:
# the database will have gone through enough sanity checks that unsafeAddr dag.headState
# SSZ exceptions shouldn't happen, which is when restore happens.
# Nonetheless, this is an ugly workaround that needs to go away
doAssert false, "Cannot alias headState"
assign(stateAddr[], dag.headState) func restore(v: var BeaconState) =
assign(v, restoreAddr[].data.data)
if not dag.db.getState(stateRoot, state.data.data, restore): if not dag.db.getState(stateRoot, state.data.data, restore):
return false return false
@ -471,7 +469,6 @@ proc putState*(dag: ChainDAGRef, state: StateData) =
# TODO we save state at every epoch start but never remove them - we also # TODO we save state at every epoch start but never remove them - we also
# potentially save multiple states per slot if reorgs happen, meaning # potentially save multiple states per slot if reorgs happen, meaning
# we could easily see a state explosion # we could easily see a state explosion
logScope: pcs = "save_state_at_epoch_start"
# As a policy, we only store epoch boundary states without the epoch block # As a policy, we only store epoch boundary states without the epoch block
# (if it exists) applied - the rest can be reconstructed by loading an epoch # (if it exists) applied - the rest can be reconstructed by loading an epoch
@ -689,13 +686,6 @@ proc updateStateData*(
startSlot, startSlot,
blck = shortLog(bs) blck = shortLog(bs)
proc loadTailState*(dag: ChainDAGRef): StateData =
## Load the state associated with the current tail in the dag
let stateRoot = dag.db.getBlock(dag.tail.root).get().message.state_root
let found = dag.getState(result, stateRoot, dag.tail)
# TODO turn into regular error, this can happen
doAssert found, "Failed to load tail state, database corrupt?"
proc delState(dag: ChainDAGRef, bs: BlockSlot) = proc delState(dag: ChainDAGRef, bs: BlockSlot) =
# Delete state state and mapping for a particular block+slot # Delete state state and mapping for a particular block+slot
if not bs.slot.isEpoch: if not bs.slot.isEpoch:
@ -716,7 +706,6 @@ proc updateHead*(
doAssert not newHead.parent.isNil() or newHead.slot == 0 doAssert not newHead.parent.isNil() or newHead.slot == 0
logScope: logScope:
newHead = shortLog(newHead) newHead = shortLog(newHead)
pcs = "fork_choice"
if dag.head == newHead: if dag.head == newHead:
debug "No head block update" debug "No head block update"

View File

@ -38,7 +38,7 @@ func getOrResolve*(dag: ChainDAGRef, quarantine: var QuarantineRef, root: Eth2Di
proc addRawBlock*( proc addRawBlock*(
dag: var ChainDAGRef, quarantine: var QuarantineRef, dag: var ChainDAGRef, quarantine: var QuarantineRef,
signedBlock: SignedBeaconBlock, onBlockAdded: OnBlockAdded signedBlock: SignedBeaconBlock, onBlockAdded: OnBlockAdded
): Result[BlockRef, BlockError] ): Result[BlockRef, BlockError] {.gcsafe.}
proc addResolvedBlock( proc addResolvedBlock(
dag: var ChainDAGRef, quarantine: var QuarantineRef, dag: var ChainDAGRef, quarantine: var QuarantineRef,
@ -47,7 +47,6 @@ proc addResolvedBlock(
onBlockAdded: OnBlockAdded onBlockAdded: OnBlockAdded
) = ) =
# TODO move quarantine processing out of here # TODO move quarantine processing out of here
logScope: pcs = "block_resolution"
doAssert state.data.data.slot == signedBlock.message.slot, doAssert state.data.data.slot == signedBlock.message.slot,
"state must match block" "state must match block"
doAssert state.blck.root == signedBlock.message.parent_root, doAssert state.blck.root == signedBlock.message.parent_root,

View File

@ -249,7 +249,7 @@ proc main() {.async.} =
quit 1 quit 1
if cfg.maxDelay > 0.0: if cfg.maxDelay > 0.0:
delayGenerator = proc (): chronos.Duration {.gcsafe.} = delayGenerator = proc (): chronos.Duration =
chronos.milliseconds (rand(cfg.minDelay..cfg.maxDelay)*1000).int chronos.milliseconds (rand(cfg.minDelay..cfg.maxDelay)*1000).int
await sendDeposits(deposits, cfg.web3Url, cfg.privateKey, await sendDeposits(deposits, cfg.web3Url, cfg.privateKey,

View File

@ -288,7 +288,7 @@ proc openStream(node: Eth2Node,
proc init*(T: type Peer, network: Eth2Node, info: PeerInfo): Peer {.gcsafe.} proc init*(T: type Peer, network: Eth2Node, info: PeerInfo): Peer {.gcsafe.}
proc getPeer*(node: Eth2Node, peerId: PeerID): Peer {.gcsafe.} = proc getPeer*(node: Eth2Node, peerId: PeerID): Peer =
node.peers.withValue(peerId, peer) do: node.peers.withValue(peerId, peer) do:
return peer[] return peer[]
do: do:
@ -493,7 +493,7 @@ else:
proc makeEth2Request(peer: Peer, protocolId: string, requestBytes: Bytes, proc makeEth2Request(peer: Peer, protocolId: string, requestBytes: Bytes,
ResponseMsg: type, ResponseMsg: type,
timeout: Duration): Future[NetRes[ResponseMsg]] timeout: Duration): Future[NetRes[ResponseMsg]]
{.gcsafe, async.} = {.async.} =
var deadline = sleepAsync timeout var deadline = sleepAsync timeout
let stream = awaitWithTimeout(peer.network.openStream(peer, protocolId), let stream = awaitWithTimeout(peer.network.openStream(peer, protocolId),
@ -578,7 +578,7 @@ proc implementSendProcBody(sendProc: SendProc) =
proc handleIncomingStream(network: Eth2Node, proc handleIncomingStream(network: Eth2Node,
conn: Connection, conn: Connection,
MsgType: type) {.async, gcsafe.} = MsgType: type) {.async.} =
mixin callUserHandler, RecType mixin callUserHandler, RecType
type MsgRec = RecType(MsgType) type MsgRec = RecType(MsgType)
@ -679,7 +679,7 @@ proc handleIncomingStream(network: Eth2Node,
proc handleOutgoingPeer(peer: Peer): Future[bool] {.async.} = proc handleOutgoingPeer(peer: Peer): Future[bool] {.async.} =
let network = peer.network let network = peer.network
proc onPeerClosed(udata: pointer) {.gcsafe.} = proc onPeerClosed(udata: pointer) =
debug "Peer (outgoing) lost", peer debug "Peer (outgoing) lost", peer
nbc_peers.set int64(len(network.peerPool)) nbc_peers.set int64(len(network.peerPool))
@ -695,7 +695,7 @@ proc handleOutgoingPeer(peer: Peer): Future[bool] {.async.} =
proc handleIncomingPeer(peer: Peer): Future[bool] {.async.} = proc handleIncomingPeer(peer: Peer): Future[bool] {.async.} =
let network = peer.network let network = peer.network
proc onPeerClosed(udata: pointer) {.gcsafe.} = proc onPeerClosed(udata: pointer) =
debug "Peer (incoming) lost", peer debug "Peer (incoming) lost", peer
nbc_peers.set int64(len(network.peerPool)) nbc_peers.set int64(len(network.peerPool))
@ -1104,7 +1104,7 @@ proc p2pProtocolBackendImpl*(p: P2PProtocol): Backend =
proc setupNat(conf: BeaconNodeConf): tuple[ip: Option[ValidIpAddress], proc setupNat(conf: BeaconNodeConf): tuple[ip: Option[ValidIpAddress],
tcpPort: Port, tcpPort: Port,
udpPort: Port] {.gcsafe.} = udpPort: Port] =
# defaults # defaults
result.tcpPort = conf.tcpPort result.tcpPort = conf.tcpPort
result.udpPort = conf.udpPort result.udpPort = conf.udpPort
@ -1185,7 +1185,9 @@ func gossipId(data: openArray[byte]): string =
func msgIdProvider(m: messages.Message): string = func msgIdProvider(m: messages.Message): string =
gossipId(m.data) gossipId(m.data)
proc createEth2Node*(rng: ref BrHmacDrbgContext, conf: BeaconNodeConf, enrForkId: ENRForkID): Eth2Node {.gcsafe.} = proc createEth2Node*(
rng: ref BrHmacDrbgContext, conf: BeaconNodeConf,
enrForkId: ENRForkID): Eth2Node =
var var
(extIp, extTcpPort, extUdpPort) = setupNat(conf) (extIp, extTcpPort, extUdpPort) = setupNat(conf)
hostAddress = tcpEndPoint(conf.libp2pAddress, conf.tcpPort) hostAddress = tcpEndPoint(conf.libp2pAddress, conf.tcpPort)
@ -1252,8 +1254,8 @@ func peersCount*(node: Eth2Node): int =
proc subscribe*[MsgType](node: Eth2Node, proc subscribe*[MsgType](node: Eth2Node,
topic: string, topic: string,
msgHandler: proc(msg: MsgType) {.gcsafe.} ) {.async, gcsafe.} = msgHandler: proc(msg: MsgType) {.gcsafe.} ) {.async.} =
proc execMsgHandler(topic: string, data: seq[byte]) {.async, gcsafe.} = proc execMsgHandler(topic: string, data: seq[byte]) {.async.} =
inc nbc_gossip_messages_received inc nbc_gossip_messages_received
trace "Incoming pubsub message received", trace "Incoming pubsub message received",
len = data.len, topic, msgId = gossipId(data) len = data.len, topic, msgId = gossipId(data)
@ -1270,8 +1272,8 @@ proc subscribe*[MsgType](node: Eth2Node,
await node.pubsub.subscribe(topic & "_snappy", execMsgHandler) await node.pubsub.subscribe(topic & "_snappy", execMsgHandler)
proc subscribe*(node: Eth2Node, topic: string) {.async, gcsafe.} = proc subscribe*(node: Eth2Node, topic: string) {.async.} =
proc dummyMsgHandler(topic: string, data: seq[byte]) {.async, gcsafe.} = proc dummyMsgHandler(topic: string, data: seq[byte]) {.async.} =
discard discard
await node.pubsub.subscribe(topic & "_snappy", dummyMsgHandler) await node.pubsub.subscribe(topic & "_snappy", dummyMsgHandler)
@ -1281,7 +1283,7 @@ proc addValidator*[MsgType](node: Eth2Node,
msgValidator: proc(msg: MsgType): bool {.gcsafe.} ) = msgValidator: proc(msg: MsgType): bool {.gcsafe.} ) =
# Validate messages as soon as subscribed # Validate messages as soon as subscribed
proc execValidator( proc execValidator(
topic: string, message: GossipMsg): Future[bool] {.async, gcsafe.} = topic: string, message: GossipMsg): Future[bool] {.async.} =
trace "Validating incoming gossip message", trace "Validating incoming gossip message",
len = message.data.len, topic, msgId = gossipId(message.data) len = message.data.len, topic, msgId = gossipId(message.data)
try: try:

View File

@ -127,13 +127,12 @@ proc storeBlock(
start = Moment.now() start = Moment.now()
attestationPool = self.attestationPool attestationPool = self.attestationPool
{.gcsafe.}: # TODO: fork choice and quarantine should sync via messages instead of callbacks let blck = self.chainDag.addRawBlock(self.quarantine, signedBlock) do (
let blck = self.chainDag.addRawBlock(self.quarantine, signedBlock) do ( blckRef: BlockRef, signedBlock: SignedBeaconBlock,
blckRef: BlockRef, signedBlock: SignedBeaconBlock, epochRef: EpochRef, state: HashedBeaconState):
epochRef: EpochRef, state: HashedBeaconState): # Callback add to fork choice if valid
# Callback add to fork choice if valid attestationPool[].addForkChoice(
attestationPool[].addForkChoice( epochRef, blckRef, signedBlock.message, wallSlot)
epochRef, blckRef, signedBlock.message, wallSlot)
self.dumpBlock(signedBlock, blck) self.dumpBlock(signedBlock, blck)
@ -254,12 +253,6 @@ proc blockValidator*(
# already-seen data, but it is fairly aggressive about forgetting about # already-seen data, but it is fairly aggressive about forgetting about
# what it has seen already # what it has seen already
debug "Dropping already-seen gossip block", delay debug "Dropping already-seen gossip block", delay
# TODO:
# Potentially use a fast exit here. We only need to check that
# the contents of the incoming message match our previously seen
# version of the block. We don't need to use HTR for this - for
# better efficiency we can use vanilla SHA256 or direct comparison
# if we still have the previous block in memory.
return false return false
# Start of block processing - in reality, we have already gone through SSZ # Start of block processing - in reality, we have already gone through SSZ

View File

@ -326,8 +326,7 @@ func prune*(
) )
trace "Pruning blocks from fork choice", trace "Pruning blocks from fork choice",
finalizedRoot = shortlog(finalized_root), finalizedRoot = shortlog(finalized_root)
pcs = "prune"
let final_phys_index = finalized_index-self.nodes.offset let final_phys_index = finalized_index-self.nodes.offset
for node_index in 0 ..< final_phys_index: for node_index in 0 ..< final_phys_index:

View File

@ -490,7 +490,7 @@ func init*(p: typedesc[PeerInfo],
proc pubsubLogger(conf: InspectorConf, switch: Switch, proc pubsubLogger(conf: InspectorConf, switch: Switch,
resolveQueue: AsyncQueue[PeerID], topic: string, resolveQueue: AsyncQueue[PeerID], topic: string,
data: seq[byte]): Future[void] {.async, gcsafe.} = data: seq[byte]): Future[void] {.async.} =
info "Received pubsub message", size = len(data), info "Received pubsub message", size = len(data),
topic = topic, topic = topic,
message = bu.toHex(data) message = bu.toHex(data)

View File

@ -155,7 +155,7 @@ proc readResponseChunk(conn: Connection, peer: Peer,
return neterr UnexpectedEOF return neterr UnexpectedEOF
proc readResponse(conn: Connection, peer: Peer, proc readResponse(conn: Connection, peer: Peer,
MsgType: type, timeout: Duration): Future[NetRes[MsgType]] {.gcsafe, async.} = MsgType: type, timeout: Duration): Future[NetRes[MsgType]] {.async.} =
when MsgType is seq: when MsgType is seq:
type E = ElemType(MsgType) type E = ElemType(MsgType)
var results: MsgType var results: MsgType

View File

@ -101,9 +101,6 @@ func get_attesting_balance(
# https://github.com/ethereum/eth2.0-specs/blob/v0.12.2/specs/phase0/beacon-chain.md#justification-and-finalization # https://github.com/ethereum/eth2.0-specs/blob/v0.12.2/specs/phase0/beacon-chain.md#justification-and-finalization
proc process_justification_and_finalization*(state: var BeaconState, proc process_justification_and_finalization*(state: var BeaconState,
cache: var StateCache, updateFlags: UpdateFlags = {}) {.nbench.} = cache: var StateCache, updateFlags: UpdateFlags = {}) {.nbench.} =
logScope: pcs = "process_justification_and_finalization"
if get_current_epoch(state) <= GENESIS_EPOCH + 1: if get_current_epoch(state) <= GENESIS_EPOCH + 1:
return return

View File

@ -129,9 +129,7 @@ proc isSynced*(node: BeaconNode, head: BlockRef): bool =
true true
proc sendAttestation*( proc sendAttestation*(
node: BeaconNode, attestation: Attestation, num_active_validators: uint64) = node: BeaconNode, attestation: Attestation, num_active_validators: uint64) =
logScope: pcs = "send_attestation"
node.network.broadcast( node.network.broadcast(
getAttestationTopic(node.forkDigest, attestation, num_active_validators), getAttestationTopic(node.forkDigest, attestation, num_active_validators),
attestation) attestation)
@ -159,8 +157,6 @@ proc createAndSendAttestation(node: BeaconNode,
committeeLen: int, committeeLen: int,
indexInCommittee: int, indexInCommittee: int,
num_active_validators: uint64) {.async.} = num_active_validators: uint64) {.async.} =
logScope: pcs = "send_attestation"
var attestation = await validator.produceAndSignAttestation( var attestation = await validator.produceAndSignAttestation(
attestationData, committeeLen, indexInCommittee, fork, attestationData, committeeLen, indexInCommittee, fork,
genesis_validators_root) genesis_validators_root)
@ -234,7 +230,7 @@ proc makeBeaconBlockForHeadAndSlot*(node: BeaconNode,
await getRandaoReveal(val_info), await getRandaoReveal(val_info),
eth1data, eth1data,
graffiti, graffiti,
node.attestationPool[].getAttestationsForBlock(state), node.attestationPool[].getAttestationsForBlock(state, cache),
deposits, deposits,
restore, restore,
cache) cache)
@ -253,15 +249,14 @@ proc proposeSignedBlock*(node: BeaconNode,
validator: AttachedValidator, validator: AttachedValidator,
newBlock: SignedBeaconBlock): Future[BlockRef] {.async.} = newBlock: SignedBeaconBlock): Future[BlockRef] {.async.} =
{.gcsafe.}: # TODO: fork choice and quarantine should sync via messages instead of callbacks let newBlockRef = node.chainDag.addRawBlock(node.quarantine,
let newBlockRef = node.chainDag.addRawBlock(node.quarantine, newBlock) do (
newBlock) do ( blckRef: BlockRef, signedBlock: SignedBeaconBlock,
blckRef: BlockRef, signedBlock: SignedBeaconBlock, epochRef: EpochRef, state: HashedBeaconState):
epochRef: EpochRef, state: HashedBeaconState): # Callback add to fork choice if valid
# Callback add to fork choice if valid node.attestationPool[].addForkChoice(
node.attestationPool[].addForkChoice( epochRef, blckRef, signedBlock.message,
epochRef, blckRef, signedBlock.message, node.beaconClock.now().slotOrZero())
node.beaconClock.now().slotOrZero())
if newBlockRef.isErr: if newBlockRef.isErr:
warn "Unable to add proposed block to block pool", warn "Unable to add proposed block to block pool",
@ -289,8 +284,6 @@ proc proposeBlock(node: BeaconNode,
validator_index: ValidatorIndex, validator_index: ValidatorIndex,
head: BlockRef, head: BlockRef,
slot: Slot): Future[BlockRef] {.async.} = slot: Slot): Future[BlockRef] {.async.} =
logScope: pcs = "block_proposal"
if head.slot >= slot: if head.slot >= slot:
# We should normally not have a head newer than the slot we're proposing for # We should normally not have a head newer than the slot we're proposing for
# but this can happen if block proposal is delayed # but this can happen if block proposal is delayed
@ -319,8 +312,6 @@ proc proposeBlock(node: BeaconNode,
proc handleAttestations(node: BeaconNode, head: BlockRef, slot: Slot) = proc handleAttestations(node: BeaconNode, head: BlockRef, slot: Slot) =
## Perform all attestations that the validators attached to this node should ## Perform all attestations that the validators attached to this node should
## perform during the given slot ## perform during the given slot
logScope: pcs = "handleAttestations"
if slot + SLOTS_PER_EPOCH < head.slot: if slot + SLOTS_PER_EPOCH < head.slot:
# The latest block we know about is a lot newer than the slot we're being # The latest block we know about is a lot newer than the slot we're being
# asked to attest to - this makes it unlikely that it will be included # asked to attest to - this makes it unlikely that it will be included
@ -404,8 +395,7 @@ proc handleProposal(node: BeaconNode, head: BlockRef, slot: Slot):
headRoot = shortLog(head.root), headRoot = shortLog(head.root),
slot = shortLog(slot), slot = shortLog(slot),
proposer_index = proposer.get()[0], proposer_index = proposer.get()[0],
proposer = shortLog(proposer.get()[1].initPubKey()), proposer = shortLog(proposer.get()[1].initPubKey())
pcs = "wait_for_proposal"
return head return head
@ -424,13 +414,13 @@ proc broadcastAggregatedAttestations(
let let
committees_per_slot = committees_per_slot =
get_committee_count_per_slot(state, aggregationSlot.epoch, cache) get_committee_count_per_slot(state, aggregationSlot.epoch, cache)
var var
slotSigs: seq[Future[ValidatorSig]] = @[] slotSigs: seq[Future[ValidatorSig]] = @[]
slotSigsData: seq[tuple[committee_index: uint64, slotSigsData: seq[tuple[committee_index: uint64,
validator_idx: ValidatorIndex, validator_idx: ValidatorIndex,
v: AttachedValidator]] = @[] v: AttachedValidator]] = @[]
for committee_index in 0'u64..<committees_per_slot: for committee_index in 0'u64..<committees_per_slot:
let committee = get_beacon_committee( let committee = get_beacon_committee(
state, aggregationSlot, committee_index.CommitteeIndex, cache) state, aggregationSlot, committee_index.CommitteeIndex, cache)
@ -455,7 +445,7 @@ proc broadcastAggregatedAttestations(
# Don't broadcast when, e.g., this node isn't aggregator # Don't broadcast when, e.g., this node isn't aggregator
# TODO verify there is only one isSome() with test. # TODO verify there is only one isSome() with test.
if aggregateAndProof.isSome: if aggregateAndProof.isSome:
let sig = await signAggregateAndProof(curr[0].v, let sig = await signAggregateAndProof(curr[0].v,
aggregateAndProof.get, state.fork, aggregateAndProof.get, state.fork,
state.genesis_validators_root) state.genesis_validators_root)
var signedAP = SignedAggregateAndProof( var signedAP = SignedAggregateAndProof(

View File

@ -116,7 +116,7 @@ cli do(slots = SLOTS_PER_EPOCH * 6,
privKey.genRandaoReveal(state.fork, state.genesis_validators_root, slot), privKey.genRandaoReveal(state.fork, state.genesis_validators_root, slot),
eth1data, eth1data,
default(GraffitiBytes), default(GraffitiBytes),
attPool.getAttestationsForBlock(state), attPool.getAttestationsForBlock(state, cache),
@[], @[],
noRollback, noRollback,
cache) cache)

View File

@ -78,7 +78,7 @@ suiteReport "Attestation pool processing" & preset():
check: check:
process_slots(state.data, MIN_ATTESTATION_INCLUSION_DELAY.Slot + 1, cache) process_slots(state.data, MIN_ATTESTATION_INCLUSION_DELAY.Slot + 1, cache)
let attestations = pool[].getAttestationsForBlock(state.data.data) let attestations = pool[].getAttestationsForBlock(state.data.data, cache)
check: check:
attestations.len == 1 attestations.len == 1
@ -110,7 +110,7 @@ suiteReport "Attestation pool processing" & preset():
discard process_slots( discard process_slots(
state.data, MIN_ATTESTATION_INCLUSION_DELAY.Slot + 1, cache) state.data, MIN_ATTESTATION_INCLUSION_DELAY.Slot + 1, cache)
let attestations = pool[].getAttestationsForBlock(state.data.data) let attestations = pool[].getAttestationsForBlock(state.data.data, cache)
check: check:
attestations.len == 1 attestations.len == 1
@ -134,7 +134,7 @@ suiteReport "Attestation pool processing" & preset():
check: check:
process_slots(state.data, MIN_ATTESTATION_INCLUSION_DELAY.Slot + 1, cache) process_slots(state.data, MIN_ATTESTATION_INCLUSION_DELAY.Slot + 1, cache)
let attestations = pool[].getAttestationsForBlock(state.data.data) let attestations = pool[].getAttestationsForBlock(state.data.data, cache)
check: check:
attestations.len == 1 attestations.len == 1
@ -161,7 +161,7 @@ suiteReport "Attestation pool processing" & preset():
check: check:
process_slots(state.data, MIN_ATTESTATION_INCLUSION_DELAY.Slot + 1, cache) process_slots(state.data, MIN_ATTESTATION_INCLUSION_DELAY.Slot + 1, cache)
let attestations = pool[].getAttestationsForBlock(state.data.data) let attestations = pool[].getAttestationsForBlock(state.data.data, cache)
check: check:
attestations.len == 1 attestations.len == 1
@ -187,7 +187,7 @@ suiteReport "Attestation pool processing" & preset():
check: check:
process_slots(state.data, MIN_ATTESTATION_INCLUSION_DELAY.Slot + 1, cache) process_slots(state.data, MIN_ATTESTATION_INCLUSION_DELAY.Slot + 1, cache)
let attestations = pool[].getAttestationsForBlock(state.data.data) let attestations = pool[].getAttestationsForBlock(state.data.data, cache)
check: check:
attestations.len == 1 attestations.len == 1
@ -398,7 +398,7 @@ suiteReport "Attestation validation " & preset():
chainDag = init(ChainDAGRef, defaultRuntimePreset, makeTestDB(SLOTS_PER_EPOCH * 3)) chainDag = init(ChainDAGRef, defaultRuntimePreset, makeTestDB(SLOTS_PER_EPOCH * 3))
quarantine = QuarantineRef() quarantine = QuarantineRef()
pool = newClone(AttestationPool.init(chainDag, quarantine)) pool = newClone(AttestationPool.init(chainDag, quarantine))
state = newClone(loadTailState(chainDag)) state = newClone(chainDag.headState)
cache = StateCache() cache = StateCache()
# Slot 0 is a finalized slot - won't be making attestations for it.. # Slot 0 is a finalized slot - won't be making attestations for it..
check: check:

View File

@ -130,7 +130,7 @@ suiteReport "Block pool processing" & preset():
check: check:
dag.getRef(default Eth2Digest) == nil dag.getRef(default Eth2Digest) == nil
wrappedTimedTest "loadTailState gets genesis block on first load" & preset(): wrappedTimedTest "loading tail block works" & preset():
let let
b0 = dag.get(dag.tail.root) b0 = dag.get(dag.tail.root)