Carry out the sync committee gossip duties

Other changes:

* Add server getBlockV2(), and produceBlockV2().
* Add getBlockV2() to REST test suite.
* Add client getBlockV2(), and produceBlockV2().
* Fix URLs in comments.
* Add some primitives and fix some issues in forks.nim.
* Switch `validator_client` to V2 calls usage.
* Bump `chronos` with imports fixes.
* Bump `nim-json-serialization` for `requireAllFields`.
This commit is contained in:
zah 2021-08-30 03:58:30 +03:00 committed by GitHub
parent 82eac7a522
commit 3689c68cbf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 175 additions and 26 deletions

View File

@ -56,7 +56,7 @@ export
# Eventually, we could also differentiate between user/tainted data and
# internal state that's gone through sanity checks already.
const SPEC_VERSION* = "1.0.1"
const SPEC_VERSION* = "1.1.0"
## Spec version we're aiming to be compatible with, right now
const

View File

@ -724,13 +724,14 @@ proc publishAggregateAndProofs*(vc: ValidatorClientRef,
raise newException(ValidatorApiError,
"Unable to publish aggregate and proofs")
proc produceBlock*(vc: ValidatorClientRef, slot: Slot,
randao_reveal: ValidatorSig,
graffiti: GraffitiBytes): Future[phase0.BeaconBlock] {.async.} =
logScope: request = "produceBlock"
vc.firstSuccessTimeout(RestResponse[ProduceBlockResponse],
proc produceBlockV2*(vc: ValidatorClientRef, slot: Slot,
randao_reveal: ValidatorSig,
graffiti: GraffitiBytes): Future[ProduceBlockResponseV2] {.
async.} =
logScope: request = "produceBlockV2"
vc.firstSuccessTimeout(RestResponse[ProduceBlockResponseV2],
SlotDuration,
produceBlock(it, slot, randao_reveal, graffiti)):
produceBlockV2(it, slot, randao_reveal, graffiti)):
if apiResponse.isErr():
debug "Unable to retrieve block data", endpoint = node,
error = apiResponse.error()
@ -740,7 +741,7 @@ proc produceBlock*(vc: ValidatorClientRef, slot: Slot,
case response.status:
of 200:
debug "Received successfull response", endpoint = node
return response.data.data
return response.data
of 400:
debug "Received invalid request response",
response_code = response.status, endpoint = node
@ -761,10 +762,15 @@ proc produceBlock*(vc: ValidatorClientRef, slot: Slot,
raise newException(ValidatorApiError, "Unable to retrieve block data")
proc publishBlock*(vc: ValidatorClientRef,
data: phase0.SignedBeaconBlock): Future[bool] {.async.} =
data: ForkedSignedBeaconBlock): Future[bool] {.async.} =
logScope: request = "publishBlock"
vc.firstSuccessTimeout(RestPlainResponse,
SlotDuration, publishBlock(it, data)):
vc.firstSuccessTimeout(RestPlainResponse, SlotDuration):
case data.kind
of BeaconBlockFork.Phase0:
publishBlock(it, data.phase0Block)
of BeaconBlockFork.Altair:
publishBlock(it, data.altairBlock)
do:
if apiResponse.isErr():
debug "Unable to publish block", endpoint = node,
error = apiResponse.error()

View File

@ -1,4 +1,4 @@
import ../spec/datatypes/[phase0, altair]
import ".."/spec/forks
import common, api
import chronicles
@ -25,7 +25,7 @@ proc publishBlock(vc: ValidatorClientRef, currentSlot, slot: Slot,
let randaoReveal = await validator.genRandaoReveal(fork, genesisRoot, slot)
let beaconBlock =
try:
await vc.produceBlock(slot, randaoReveal, graffiti)
await vc.produceBlockV2(slot, randaoReveal, graffiti)
except ValidatorApiError:
error "Unable to retrieve block data", slot = slot,
wall_slot = currentSlot, validator = shortLog(validator)
@ -36,25 +36,20 @@ proc publishBlock(vc: ValidatorClientRef, currentSlot, slot: Slot,
return
let blockRoot = hash_tree_root(beaconBlock)
var signedBlock = phase0.SignedBeaconBlock(message: beaconBlock,
root: hash_tree_root(beaconBlock))
# TODO: signing_root is recomputed in signBlockProposal just after
let signing_root = compute_block_root(fork, genesisRoot, slot,
signedBlock.root)
blockRoot)
let notSlashable = vc.attachedValidators
.slashingProtection
.registerBlock(ValidatorIndex(signedBlock.message.proposer_index),
.registerBlock(ValidatorIndex(beaconBlock.proposer_index),
validator.pubKey, slot, signing_root)
if notSlashable.isOk():
let signature = await validator.signBlockProposal(fork, genesisRoot, slot,
blockRoot)
let signedBlock =
phase0.SignedBeaconBlock(message: beaconBlock, root: blockRoot,
signature: signature)
debug "Sending block", blck = shortLog(signedBlock.message),
let signedBlock = ForkedSignedBeaconBlock.init(beaconBlock, blockRoot,
signature)
debug "Sending block", blck = shortLog(signedBlock),
signature = shortLog(signature), blockRoot = shortLog(blockRoot),
validator = shortLog(validator)
@ -62,7 +57,7 @@ proc publishBlock(vc: ValidatorClientRef, currentSlot, slot: Slot,
try:
await vc.publishBlock(signedBlock)
except ValidatorApiError:
error "Unable to publish block", blck = shortLog(signedBlock.message),
error "Unable to publish block", blck = shortLog(signedBlock),
blockRoot = shortLog(blockRoot),
validator = shortLog(validator),
validator_index = validator.index.get(),
@ -73,12 +68,12 @@ proc publishBlock(vc: ValidatorClientRef, currentSlot, slot: Slot,
err_name = exc.name, err_msg = exc.msg
return
if res:
notice "Block published", blck = shortLog(signedBlock.message),
notice "Block published", blck = shortLog(signedBlock),
blockRoot = shortLog(blockRoot), validator = shortLog(validator),
validator_index = validator.index.get()
else:
warn "Block was not accepted by beacon node",
blck = shortLog(signedBlock.message),
blck = shortLog(signedBlock),
blockRoot = shortLog(blockRoot),
validator = shortLog(validator),
validator_index = validator.index.get(),

View File

@ -584,6 +584,147 @@ proc handleAttestations(node: BeaconNode, head: BlockRef, slot: Slot) =
validator = validator.pubkey,
badVoteDetails = $registered.error()
proc createAndSendSyncCommitteeMessage(node: BeaconNode,
slot: Slot,
validator: AttachedValidator,
committeeIdx: SyncCommitteeIndex,
head: BlockRef) {.async.} =
try:
let
fork = node.dag.forkAtEpoch(slot.epoch)
genesisValidatorsRoot = node.dag.genesisValidatorsRoot
msg = await signSyncCommitteeMessage(validator, slot, fork,
genesisValidatorsRoot, head.root)
let ok = await node.sendSyncCommitteeMessage(
msg, committeeIdx, checkSignature = false)
if not ok: # Logged in sendSyncCommitteeMessage
return
if node.config.dumpEnabled:
dump(node.config.dumpDirOutgoing, msg, validator.pubKey)
let
wallTime = node.beaconClock.now()
deadline = msg.slot.toBeaconTime() +
seconds(int(SECONDS_PER_SLOT div 3))
let (delayStr, delaySecs) =
if wallTime < deadline:
("-" & $(deadline - wallTime), -toFloatSeconds(deadline - wallTime))
else:
($(wallTime - deadline), toFloatSeconds(wallTime - deadline))
notice "Sync committee message sent",
message = shortLog(msg),
validator = shortLog(validator),
delay = delayStr
beacon_sync_committee_message_sent_delay.observe(delaySecs)
except CatchableError as exc:
# An error could happen here when the signature task fails - we must
# not leak the exception because this is an asyncSpawn task
notice "Error sending sync committee message", err = exc.msg
proc handleSyncCommitteeMessages(node: BeaconNode, head: BlockRef, slot: Slot) =
# TODO Use a view type to avoid the copy
var syncCommittee = @(node.dag.syncCommitteeParticipants(slot + 1))
for committeeIdx in allSyncCommittees():
for valKey in syncSubcommittee(syncCommittee, committeeIdx):
let validator = node.getAttachedValidator(valKey)
if validator == nil or not validator.index.isSome():
continue
asyncSpawn createAndSendSyncCommitteeMessage(node, slot, validator,
committeeIdx, head)
proc signAndSendContribution(node: BeaconNode,
validator: AttachedValidator,
contribution: SyncCommitteeContribution,
selectionProof: ValidatorSig) {.async.} =
try:
let msg = (ref SignedContributionAndProof)(
message: ContributionAndProof(
aggregator_index: uint64 validator.index.get,
contribution: contribution,
selection_proof: selectionProof))
await validator.sign(msg,
node.dag.forkAtEpoch(contribution.slot.epoch),
node.dag.genesisValidatorsRoot)
# Failures logged in sendSyncCommitteeContribution
discard await node.sendSyncCommitteeContribution(msg[], false)
except CatchableError as exc:
# An error could happen here when the signature task fails - we must
# not leak the exception because this is an asyncSpawn task
notice "Error sending sync committee contribution", err = exc.msg
proc handleSyncCommitteeContributions(node: BeaconNode,
head: BlockRef, slot: Slot) {.async.} =
# TODO Use a view type to avoid the copy
let
fork = node.dag.forkAtEpoch(slot.epoch)
genesisValidatorsRoot = node.dag.genesisValidatorsRoot
syncCommittee = @(node.dag.syncCommitteeParticipants(slot + 1))
type
AggregatorCandidate = object
validator: AttachedValidator
committeeIdx: SyncCommitteeIndex
var candidateAggregators: seq[AggregatorCandidate]
var selectionProofs: seq[Future[ValidatorSig]]
var time = timeIt:
for committeeIdx in allSyncCommittees():
# TODO Hoist outside of the loop with a view type
# to avoid the repeated offset calculations
for valKey in syncSubcommittee(syncCommittee, committeeIdx):
let validator = node.getAttachedValidator(valKey)
if validator == nil:
continue
candidateAggregators.add AggregatorCandidate(
validator: validator,
committeeIdx: committeeIdx)
selectionProofs.add validator.getSyncCommitteeSelectionProof(
fork, genesisValidatorsRoot, slot, committeeIdx.asUInt64)
await allFutures(selectionProofs)
debug "Prepared contributions selection proofs",
count = selectionProofs.len, time
var contributionsSent = 0
time = timeIt:
for i in 0 ..< selectionProofs.len:
if not selectionProofs[i].completed:
continue
let selectionProof = selectionProofs[i].read
if not is_sync_committee_aggregator(selectionProof):
continue
var contribution: SyncCommitteeContribution
let contributionWasProduced = node.syncCommitteeMsgPool[].produceContribution(
slot, head, candidateAggregators[i].committeeIdx, contribution)
if contributionWasProduced:
asyncSpawn signAndSendContribution(
node,
candidateAggregators[i].validator,
contribution,
selectionProof)
debug "Contribution sent", contribution = shortLog(contribution)
inc contributionsSent
else:
debug "Failure to produce contribution",
slot, head, subnet = candidateAggregators[i].committeeIdx
notice "Contributions sent", count = contributionsSent, time
proc handleProposal(node: BeaconNode, head: BlockRef, slot: Slot):
Future[BlockRef] {.async.} =
## Perform the proposal for the given slot, iff we have a validator attached
@ -827,6 +968,7 @@ proc handleValidatorDuties*(node: BeaconNode, lastSlot, slot: Slot) {.async.} =
head = node.dag.head
handleAttestations(node, head, slot)
handleSyncCommitteeMessages(node, head, slot)
updateValidatorMetrics(node) # the important stuff is done, update the vanity numbers
@ -848,6 +990,10 @@ proc handleValidatorDuties*(node: BeaconNode, lastSlot, slot: Slot) {.async.} =
let sendAggregatedAttestationsFut =
sendAggregatedAttestations(node, head, slot)
let handleSyncCommitteeContributionsFut =
handleSyncCommitteeContributions(node, head, slot)
await handleSyncCommitteeContributionsFut
await sendAggregatedAttestationsFut
if node.eth1Monitor != nil and (slot mod SLOTS_PER_EPOCH) == 0:

View File

@ -338,6 +338,7 @@ MIN_GENESIS_TIME: 0
GENESIS_DELAY: 10
DEPOSIT_CONTRACT_ADDRESS: ${DEPOSIT_CONTRACT_ADDRESS}
ETH1_FOLLOW_DISTANCE: 1
ALTAIR_FORK_EPOCH: 2
EOF
dump_logs() {

View File

@ -205,6 +205,7 @@ GENESIS_DELAY: 10
GENESIS_FORK_VERSION: 0x00000000
DEPOSIT_CONTRACT_ADDRESS: ${DEPOSIT_CONTRACT_ADDRESS}
ETH1_FOLLOW_DISTANCE: 1
ALTAIR_FORK_EPOCH: 2
EOF
if [[ "$USE_TMUX" == "yes" ]]; then