Sync committee related REST API implementation. (#2856)

This commit is contained in:
Eugene Kabanov 2021-09-24 01:13:25 +03:00 committed by GitHub
parent 5670d58155
commit 0c635334a2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 479 additions and 34 deletions

View File

@ -9,7 +9,7 @@
import
std/[options, sequtils, tables, sets],
stew/[assign2, byteutils],
stew/[assign2, byteutils, results],
metrics, snappy, chronicles,
../spec/[
beaconstate, eth2_merkleization, eth2_ssz_serialization, forks, helpers,
@ -18,7 +18,7 @@ import
".."/beacon_chain_db,
"."/[block_pools_types, block_quarantine, forkedbeaconstate_dbhelpers]
export block_pools_types
export block_pools_types, results
# https://github.com/ethereum/eth2.0-metrics/blob/master/metrics.md#interop-metrics
declareGauge beacon_head_root, "Root of the head block of the beacon chain"

View File

@ -97,6 +97,24 @@ proc toString*(kind: ValidatorFilterKind): string =
of ValidatorFilterKind.WithdrawalDone:
"withdrawal_done"
func syncCommitteeParticipants*(forkedState: ForkedHashedBeaconState,
epoch: Epoch): Result[seq[ValidatorPubKey], cstring] =
case forkedState.beaconStateFork
of BeaconStateFork.forkPhase0:
err("State's fork do not support sync committees")
of BeaconStateFork.forkAltair:
let
headSlot = forkedState.hbsAltair.data.slot
epochPeriod = syncCommitteePeriod(epoch.compute_start_slot_at_epoch())
currentPeriod = syncCommitteePeriod(headSlot)
nextPeriod = currentPeriod + 1'u64
if epochPeriod == currentPeriod:
ok(@(forkedState.hbsAltair.data.current_sync_committee.pubkeys.data))
elif epochPeriod == nextPeriod:
ok(@(forkedState.hbsAltair.data.next_sync_committee.pubkeys.data))
else:
err("Epoch is outside the sync committee period of the state")
proc installBeaconApiHandlers*(router: var RestRouter, node: BeaconNode) =
# https://ethereum.github.io/beacon-APIs/#/Beacon/getGenesis
router.api(MethodGet, "/api/eth/v1/beacon/genesis") do () -> RestApiResponse:
@ -468,7 +486,7 @@ proc installBeaconApiHandlers*(router: var RestRouter, node: BeaconNode) =
none[Slot]()
node.withStateForBlockSlot(bslot):
proc getCommittee(slot: Slot,
index: CommitteeIndex): RestBeaconStatesCommittees =
index: CommitteeIndex): RestBeaconStatesCommittees =
let validators = get_beacon_committee(stateData.data, slot, index,
cache).mapIt(it)
RestBeaconStatesCommittees(index: index, slot: slot,
@ -504,6 +522,85 @@ proc installBeaconApiHandlers*(router: var RestRouter, node: BeaconNode) =
return RestApiResponse.jsonError(Http500, InternalServerError)
# https://ethereum.github.io/beacon-APIs/#/Beacon/getEpochSyncCommittees
router.api(MethodGet,
"/api/eth/v1/beacon/states/{state_id}/sync_committees") do (
state_id: StateIdent, epoch: Option[Epoch]) -> RestApiResponse:
let bslot =
block:
if state_id.isErr():
return RestApiResponse.jsonError(Http400, InvalidStateIdValueError,
$state_id.error())
let bres = node.getBlockSlot(state_id.get())
if bres.isErr():
return RestApiResponse.jsonError(Http404, StateNotFoundError,
$bres.error())
bres.get()
let qepoch =
if epoch.isSome():
let repoch = epoch.get()
if repoch.isErr():
return RestApiResponse.jsonError(Http400, InvalidEpochValueError,
$repoch.error())
let res = repoch.get()
if res > MaxEpoch:
return RestApiResponse.jsonError(Http400, EpochOverflowValueError)
if res < node.dag.cfg.ALTAIR_FORK_EPOCH:
return RestApiResponse.jsonError(Http400,
EpochFromTheIncorrectForkError)
res
else:
# If ``epoch`` not present then the sync committees for the epoch of
# the state will be obtained.
bslot.slot.epoch()
node.withStateForBlockSlot(bslot):
let keys =
block:
let res = syncCommitteeParticipants(stateData().data, qepoch)
if res.isErr():
return RestApiResponse.jsonError(Http400,
$res.error())
let kres = res.get()
if len(kres) == 0:
return RestApiResponse.jsonError(Http500, InternalServerError,
"List of sync committee participants is empty")
kres
let indices =
block:
var res: seq[ValidatorIndex]
let keyset = keys.toHashSet()
for index, validator in getStateField(stateData.data,
validators).pairs():
if validator.pubkey in keyset:
res.add(ValidatorIndex(uint64(index)))
res
if len(indices) != len(keys):
return RestApiResponse.jsonError(Http500, InternalServerError,
"Could not get validator indices")
let aggregates =
block:
var
res: seq[seq[ValidatorIndex]]
offset = 0
while true:
let length = min(SYNC_SUBCOMMITTEE_SIZE, len(indices) - offset)
if length == 0:
break
res.add(@(indices.toOpenArray(offset, offset + length - 1)))
offset.inc(length)
res
return RestApiResponse.jsonResponse(GetEpochSyncCommitteesResponse(
data: RestEpochSyncCommittee(validators: indices,
validator_aggregates: aggregates)
))
return RestApiResponse.jsonError(Http400, "Could not get requested state")
# https://ethereum.github.io/beacon-APIs/#/Beacon/getBlockHeaders
router.api(MethodGet, "/api/eth/v1/beacon/headers") do (
slot: Option[Slot], parent_root: Option[Eth2Digest]) -> RestApiResponse:
@ -893,6 +990,37 @@ proc installBeaconApiHandlers*(router: var RestRouter, node: BeaconNode) =
$res.error())
return RestApiResponse.jsonMsgResponse(ProposerSlashingValidationSuccess)
# https://ethereum.github.io/beacon-APIs/#/Beacon/submitPoolSyncCommitteeSignatures
router.api(MethodPost, "/api/eth/v1/beacon/pool/sync_committees") do (
contentBody: Option[ContentBody]) -> RestApiResponse:
let messages =
block:
if contentBody.isNone():
return RestApiResponse.jsonError(Http400, EmptyRequestBodyError)
let dres = decodeBody(seq[SyncCommitteeMessage], contentBody.get())
if dres.isErr():
return RestApiResponse.jsonError(Http400,
InvalidSyncCommitteeSignatureMessageError)
dres.get()
let results = await node.sendSyncCommitteeMessages(messages)
let failures =
block:
var res: seq[RestAttestationsFailure]
for index, item in results.pairs():
if item.isErr():
res.add(RestAttestationsFailure(index: uint64(index),
message: $item.error()))
res
if len(failures) > 0:
return RestApiResponse.jsonErrorList(Http400,
SyncCommitteeMessageValidationError,
failures)
else:
return RestApiResponse.jsonMsgResponse(
SyncCommitteeMessageValidationSuccess)
# https://ethereum.github.io/beacon-APIs/#/Beacon/getPoolVoluntaryExits
router.api(MethodGet, "/api/eth/v1/beacon/pool/voluntary_exits") do (
) -> RestApiResponse:
@ -965,6 +1093,11 @@ proc installBeaconApiHandlers*(router: var RestRouter, node: BeaconNode) =
"/eth/v1/beacon/states/{state_id}/committees",
"/api/eth/v1/beacon/states/{state_id}/committees"
)
router.redirect(
MethodGet,
"/eth/v1/beacon/states/{state_id}/sync_committees",
"/api/eth/v1/beacon/states/{state_id}/sync_committees"
)
router.redirect(
MethodGet,
"/eth/v1/beacon/headers",
@ -1030,6 +1163,11 @@ proc installBeaconApiHandlers*(router: var RestRouter, node: BeaconNode) =
"/eth/v1/beacon/pool/proposer_slashings",
"/api/eth/v1/beacon/pool/proposer_slashings"
)
router.redirect(
MethodPost,
"/eth/v1/beacon/pool/sync_committees",
"/api/eth/v1/beacon/pool/sync_committees"
)
router.redirect(
MethodPost,
"/eth/v1/beacon/pool/voluntary_exits",

View File

@ -58,7 +58,9 @@ const
AggregateAndProofValidationSuccess* =
"Aggregate and proof object(s) was broadcasted"
BeaconCommitteeSubscriptionSuccess* =
"Beacon node processed committee subscription request"
"Beacon node processed committee subscription request(s)"
SyncCommitteeSubscriptionSuccess* =
"Beacon node processed sync committee subscription request(s)"
InvalidParentRootValueError* =
"Invalid parent root value"
MissingSlotValueError* =
@ -119,6 +121,10 @@ const
"Requested slot not in next wall-slot epoch"
SlotFromThePastError* =
"Requested slot from the past"
SlotFromTheIncorrectForkError* =
"Requested slot is from incorrect fork"
EpochFromTheIncorrectForkError* =
"Requested epoch is from incorrect fork"
ProposerNotFoundError* =
"Could not find proposer for the head and slot"
NoHeadForSlotError* =
@ -139,6 +145,32 @@ const
"Could not find out accepted content type"
InvalidAcceptError* =
"Incorrect accept response type"
MissingSubCommitteeIndexValueError* =
"Missing `subcommittee_index` value"
InvalidSubCommitteeIndexValueError* =
"Invalid `subcommittee_index` value"
MissingBeaconBlockRootValueError* =
"Missing `beacon_block_root` value"
InvalidBeaconBlockRootValueError* =
"Invalid `beacon_block_root` value"
EpochOutsideSyncCommitteePeriodError* =
"Epoch is outside the sync committee period of the state"
InvalidSyncCommitteeSignatureMessageError* =
"Unable to decode sync committee message(s)"
InvalidSyncCommitteeSubscriptionRequestError* =
"Unable to decode sync committee subscription request(s)"
InvalidContributionAndProofMessageError* =
"Unable to decode contribute and proof message(s)"
SyncCommitteeMessageValidationError* =
"Some errors happened while validating sync committee message(s)"
SyncCommitteeMessageValidationSuccess* =
"Sync committee message(s) was broadcasted"
ContributionAndProofValidationError* =
"Some errors happened while validating contribution and proof(s)"
ContributionAndProofValidationSuccess* =
"Contribution and proof(s) was broadcasted"
ProduceContributionError* =
"Unable to produce contribution using the passed parameters"
InternalServerError* =
"Internal server error"
NoImplementationError* =

View File

@ -3,19 +3,18 @@
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.
import
std/[typetraits, strutils],
stew/[results, base10],
chronicles,
json_serialization, json_serialization/std/[options, net],
nimcrypto/utils as ncrutils,
".."/[beacon_chain_db, beacon_node_common],
../networking/eth2_network,
../consensus_object_pools/[blockchain_dag, spec_cache, attestation_pool],
../validators/validator_duties,
../spec/[forks, network],
../spec/datatypes/[phase0, altair],
./rest_utils
import std/[typetraits, strutils]
import stew/[results, base10], chronicles, json_serialization,
json_serialization/std/[options, net],
nimcrypto/utils as ncrutils
import ".."/[beacon_chain_db, beacon_node_common],
".."/networking/eth2_network,
".."/consensus_object_pools/[blockchain_dag, spec_cache,
attestation_pool, sync_committee_msg_pool],
".."/validators/validator_duties,
".."/spec/[forks, network],
".."/spec/datatypes/[phase0, altair],
"."/rest_utils
logScope: topics = "rest_validatorapi"
@ -383,7 +382,7 @@ proc installValidatorApiHandlers*(router: var RestRouter, node: BeaconNode) =
block:
let idx = request.validator_index
if uint64(idx) >=
lenu64(getStateField(node.dag.headState.data, validators)):
lenu64(getStateField(node.dag.headState.data, validators)):
return RestApiResponse.jsonError(Http400,
InvalidValidatorIndexValueError)
getStateField(node.dag.headState.data, validators)[idx].pubkey
@ -410,6 +409,146 @@ proc installValidatorApiHandlers*(router: var RestRouter, node: BeaconNode) =
warn "Beacon committee subscription request served, but not implemented"
return RestApiResponse.jsonMsgResponse(BeaconCommitteeSubscriptionSuccess)
# https://ethereum.github.io/beacon-APIs/#/Validator/prepareSyncCommitteeSubnets
router.api(MethodPost,
"/api/eth/v1/validator/sync_committee_subscriptions") do (
contentBody: Option[ContentBody]) -> RestApiResponse:
let subscriptions =
block:
if contentBody.isNone():
return RestApiResponse.jsonError(Http400, EmptyRequestBodyError)
let dres = decodeBody(seq[RestSyncCommitteeSubscription],
contentBody.get())
if dres.isErr():
return RestApiResponse.jsonError(Http400,
InvalidSyncCommitteeSubscriptionRequestError)
let subs = dres.get()
for item in subs:
if item.until_epoch > MaxEpoch:
return RestApiResponse.jsonError(Http400, EpochOverflowValueError)
if item.until_epoch < node.dag.cfg.ALTAIR_FORK_EPOCH:
return RestApiResponse.jsonError(Http400,
EpochFromTheIncorrectForkError)
if uint64(item.validator_index) >=
lenu64(getStateField(node.dag.headState.data, validators)):
return RestApiResponse.jsonError(Http400,
InvalidValidatorIndexValueError)
subs
warn "Sync committee subscription request served, but not implemented"
return RestApiResponse.jsonMsgResponse(SyncCommitteeSubscriptionSuccess)
# https://ethereum.github.io/beacon-APIs/#/Validator/produceSyncCommitteeContribution
router.api(MethodGet,
"/api/eth/v1/validator/sync_committee_contribution") do (
slot: Option[Slot], subcommittee_index: Option[uint64],
beacon_block_root: Option[Eth2Digest]) -> RestApiResponse:
# We doing this check to avoid any confusion in future.
static: doAssert(SYNC_COMMITTEE_SUBNET_COUNT <= high(uint8))
let qslot =
if slot.isNone():
return RestApiResponse.jsonError(Http400, MissingSlotValueError)
else:
let res = slot.get()
if res.isErr():
return RestApiResponse.jsonError(Http400, InvalidSlotValueError,
$res.error())
let rslot = res.get()
if epoch(rslot) < node.dag.cfg.ALTAIR_FORK_EPOCH:
return RestApiResponse.jsonError(Http400,
SlotFromTheIncorrectForkError)
rslot
let qindex =
if subcommittee_index.isNone():
return RestApiResponse.jsonError(Http400,
MissingSubCommitteeIndexValueError)
else:
let res = subcommittee_index.get()
if res.isErr():
return RestApiResponse.jsonError(Http400,
InvalidSubCommitteeIndexValueError,
$res.error())
let value = res.get()
if value >= SYNC_COMMITTEE_SUBNET_COUNT:
return RestApiResponse.jsonError(Http400,
InvalidSubCommitteeIndexValueError,
"subcommittee_index exceeds " &
"maximum allowed value")
value
let qroot =
if beacon_block_root.isNone():
return RestApiResponse.jsonError(Http400,
MissingBeaconBlockRootValueError)
else:
let res = beacon_block_root.get()
if res.isErr():
return RestApiResponse.jsonError(Http400,
InvalidBeaconBlockRootValueError,
$res.error())
res.get()
# Check if node is fully synced.
let sres = node.getCurrentHead(qslot)
if sres.isErr():
return RestApiResponse.jsonError(Http503, BeaconNodeInSyncError)
var contribution = SyncCommitteeContribution()
let res = node.syncCommitteeMsgPool[].produceContribution(
qslot, qroot, SyncCommitteeIndex(qindex), contribution)
if not(res):
return RestApiResponse.jsonError(Http400, ProduceContributionError)
return RestApiResponse.jsonResponse(contribution)
# https://ethereum.github.io/beacon-APIs/#/Validator/publishContributionAndProofs
router.api(MethodPost,
"/api/eth/v1/validator/contribution_and_proofs") do (
contentBody: Option[ContentBody]) -> RestApiResponse:
let proofs =
block:
if contentBody.isNone():
return RestApiResponse.jsonError(Http400, EmptyRequestBodyError)
let dres = decodeBody(seq[SignedContributionAndProof],
contentBody.get())
if dres.isErr():
return RestApiResponse.jsonError(Http400,
InvalidContributionAndProofMessageError)
dres.get()
let pending =
block:
var res: seq[Future[SendResult]]
for proof in proofs:
res.add(node.sendSyncCommitteeContribution(proof, true))
res
let failures =
block:
var res: seq[RestAttestationsFailure]
await allFutures(pending)
for index, future in pending.pairs():
if future.done():
let fres = future.read()
if fres.isErr():
let failure = RestAttestationsFailure(index: uint64(index),
message: $fres.error())
res.add(failure)
elif future.failed() or future.cancelled():
# This is unexpected failure, so we log the error message.
let exc = future.readError()
let failure = RestAttestationsFailure(index: uint64(index),
message: $exc.msg)
res.add(failure)
res
if len(failures) > 0:
return RestApiResponse.jsonErrorList(Http400,
ContributionAndProofValidationError,
failures)
else:
return RestApiResponse.jsonMsgResponse(
ContributionAndProofValidationSuccess
)
router.redirect(
MethodPost,
"/eth/v1/validator/duties/attester/{epoch}",
@ -450,3 +589,18 @@ proc installValidatorApiHandlers*(router: var RestRouter, node: BeaconNode) =
"/eth/v1/validator/beacon_committee_subscriptions",
"/api/eth/v1/validator/beacon_committee_subscriptions"
)
router.redirect(
MethodPost,
"/eth/v1/validator/sync_committee_subscriptions",
"/api/eth/v1/validator/sync_committee_subscriptions"
)
router.redirect(
MethodGet,
"/eth/v1/validator/sync_committee_contribution",
"/api/eth/v1/validator/sync_committee_contribution"
)
router.redirect(
MethodPost,
"/eth/v1/validator/contribution_and_proofs",
"/api/eth/v1/validator/contribution_and_proofs"
)

View File

@ -803,6 +803,22 @@ template toSszType*(v: BeaconStateFork): auto =
of BeaconStateFork.forkPhase0: Phase0Version
of BeaconStateFork.forkAltair: AltairVersion
# SyncCommitteeIndex
proc writeValue*(writer: var JsonWriter[RestJson],
value: SyncCommitteeIndex) {.
raises: [IOError, Defect].} =
writeValue(writer, Base10.toString(uint8(value)))
proc readValue*(reader: var JsonReader[RestJson],
value: var SyncCommitteeIndex) {.
raises: [IOError, SerializationError, Defect].} =
let res = Base10.decode(uint8, reader.readValue(string))
if res.isOk():
# TODO (cheatfate): Here should be present check for maximum value.
value = SyncCommitteeIndex(res.get())
else:
reader.raiseUnexpectedValue($res.error())
proc parseRoot(value: string): Result[Eth2Digest, cstring] =
try:
ok(Eth2Digest(data: hexToByteArray[32](value)))
@ -1030,6 +1046,10 @@ proc decodeString*(t: typedesc[Epoch], value: string): Result[Epoch, cstring] =
let res = ? Base10.decode(uint64, value)
ok(Epoch(res))
proc decodeString*(t: typedesc[uint64],
value: string): Result[uint64, cstring] =
Base10.decode(uint64, value)
proc decodeString*(t: typedesc[StateIdent],
value: string): Result[StateIdent, cstring] =
if len(value) > 2:

View File

@ -108,6 +108,11 @@ type
slot*: Slot
is_aggregator*: bool
RestSyncCommitteeSubscription* = object
validator_index*: ValidatorIndex
sync_committee_indices*: seq[SyncCommitteeIndex]
until_epoch*: Epoch
RestBeaconStatesFinalityCheckpoints* = object
previous_justified*: Checkpoint
current_justified*: Checkpoint
@ -260,6 +265,10 @@ type
RestBlockInfo* = object
slot*: Slot
blck* {.serializedFieldName: "block".}: Eth2Digest
RestEpochSyncCommittee* = object
validators*: seq[ValidatorIndex]
validator_aggregates*: seq[seq[ValidatorIndex]]
DataEnclosedObject*[T] = object
data*: T
@ -305,6 +314,7 @@ type
GetStateValidatorsResponse* = DataEnclosedObject[seq[RestValidator]]
GetSyncingStatusResponse* = DataEnclosedObject[RestSyncInfo]
GetVersionResponse* = DataEnclosedObject[RestNodeVersion]
GetEpochSyncCommitteesResponse* = DataEnclosedObject[RestEpochSyncCommittee]
ProduceAttestationDataResponse* = DataEnclosedObject[AttestationData]
ProduceBlockResponse* = DataEnclosedObject[phase0.BeaconBlock]
ProduceBlockResponseV2* = ForkedBeaconBlock

View File

@ -200,28 +200,118 @@ proc sendAttestation*(
proc sendSyncCommitteeMessage*(
node: BeaconNode, msg: SyncCommitteeMessage,
committeeIdx: SyncCommitteeIndex, checkSignature: bool): Future[bool] {.async.} =
committeeIdx: SyncCommitteeIndex,
checkSignature: bool): Future[SendResult] {.async.} =
# Validate sync committee message before sending it via gossip
# validation will also register the message with the sync committee
# message pool. Notably, although libp2p calls the data handler for
# any subscription on the subnet topic, it does not perform validation.
let ok = node.processor.syncCommitteeMsgValidator(
msg, committeeIdx, checkSignature)
return case ok
let res = node.processor.syncCommitteeMsgValidator(msg, committeeIdx,
checkSignature)
return
case res
of ValidationResult.Accept:
node.network.broadcastSyncCommitteeMessage(msg, committeeIdx)
beacon_sync_committee_messages_sent.inc()
true
SendResult.ok()
else:
notice "Produced sync committee message failed validation",
msg, result = $ok
false
notice "Sync committee message failed validation",
msg, result = $res
SendResult.err("Sync committee message failed validation")
proc sendSyncCommitteeMessages*(node: BeaconNode,
msgs: seq[SyncCommitteeMessage]
): Future[seq[SendResult]] {.async.} =
let validators = getStateField(node.dag.headState.data, validators)
var statuses = newSeq[Option[SendResult]](len(msgs))
let ranges =
block:
let
headSlot = getStateField(node.dag.headState.data, slot)
headCommitteePeriod = syncCommitteePeriod(headSlot)
currentStart = syncCommitteePeriodStartSlot(headCommitteePeriod)
currentFinish = currentStart + SLOTS_PER_SYNC_COMMITTEE_PERIOD
nextStart = currentFinish
nextFinish = nextStart + SLOTS_PER_SYNC_COMMITTEE_PERIOD
(curStart: Slot(currentStart), curFinish: Slot(currentFinish),
nxtStart: Slot(nextStart), nxtFinish: Slot(nextFinish))
let (keysCur, keysNxt) =
block:
var resCur: Table[ValidatorPubKey, int]
var resNxt: Table[ValidatorPubKey, int]
for index, msg in msgs.pairs():
if msg.validator_index < lenu64(validators):
if (msg.slot >= ranges.curStart) and (msg.slot < ranges.curFinish):
resCur[validators[msg.validator_index].pubkey] = index
elif (msg.slot >= ranges.nxtStart) and (msg.slot < ranges.nxtFinish):
resNxt[validators[msg.validator_index].pubkey] = index
else:
statuses[index] =
some(SendResult.err("Message's slot out of state's head range"))
else:
statuses[index] = some(SendResult.err("Incorrect validator's index"))
if (len(resCur) == 0) and (len(resNxt) == 0):
return statuses.mapIt(it.get())
(resCur, resNxt)
template curParticipants(): untyped =
node.dag.headState.data.hbsAltair.data.current_sync_committee.pubkeys.data
template nxtParticipants(): untyped =
node.dag.headState.data.hbsAltair.data.next_sync_committee.pubkeys.data
let (pending, indices) =
block:
var resFutures: seq[Future[SendResult]]
var resIndices: seq[int]
for committeeIdx in allSyncCommittees():
for valKey in syncSubcommittee(curParticipants(), committeeIdx):
let index = keysCur.getOrDefault(valKey, -1)
if index >= 0:
resIndices.add(index)
resFutures.add(node.sendSyncCommitteeMessage(msgs[index],
committeeIdx, true))
for committeeIdx in allSyncCommittees():
for valKey in syncSubcommittee(nxtParticipants(), committeeIdx):
let index = keysNxt.getOrDefault(valKey, -1)
if index >= 0:
resIndices.add(index)
resFutures.add(node.sendSyncCommitteeMessage(msgs[index],
committeeIdx, true))
(resFutures, resIndices)
await allFutures(pending)
for index, future in pending.pairs():
if future.done():
let fres = future.read()
if fres.isErr():
statuses[indices[index]] = some(SendResult.err(fres.error()))
else:
statuses[indices[index]] = some(SendResult.ok())
elif future.failed() or future.cancelled():
let exc = future.readError()
debug "Unexpected failure while sending committee message",
message = msgs[indices[index]], error = $exc.msg
statuses[indices[index]] = some(SendResult.err(
"Unexpected failure while sending committee message"))
let results =
block:
var res: seq[SendResult]
for item in statuses:
if item.isSome():
res.add(item.get())
else:
res.add(SendResult.err("Message validator not in sync committee"))
res
return results
proc sendSyncCommitteeContribution*(
node: BeaconNode,
msg: SignedContributionAndProof,
checkSignature: bool): Future[bool] {.async.} =
checkSignature: bool): Future[SendResult] {.async.} =
let ok = node.processor.syncCommitteeContributionValidator(
msg, checkSignature)
@ -229,11 +319,11 @@ proc sendSyncCommitteeContribution*(
of ValidationResult.Accept:
node.network.broadcastSignedContributionAndProof(msg)
beacon_sync_committee_contributions_sent.inc()
true
SendResult.ok()
else:
notice "Produced sync committee contribution failed validation",
notice "Sync committee contribution failed validation",
msg, result = $ok
false
SendResult.err("Sync committee contribution failed validation")
proc createAndSendAttestation(node: BeaconNode,
fork: Fork,
@ -598,9 +688,10 @@ proc createAndSendSyncCommitteeMessage(node: BeaconNode,
msg = await signSyncCommitteeMessage(validator, slot, fork,
genesisValidatorsRoot, head.root)
let ok = await node.sendSyncCommitteeMessage(
let res = await node.sendSyncCommitteeMessage(
msg, committeeIdx, checkSignature = false)
if not ok: # Logged in sendSyncCommitteeMessage
if res.isErr():
# Logged in sendSyncCommitteeMessage
return
if node.config.dumpEnabled:
@ -635,7 +726,7 @@ proc handleSyncCommitteeMessages(node: BeaconNode, head: BlockRef, slot: Slot) =
for committeeIdx in allSyncCommittees():
for valKey in syncSubcommittee(syncCommittee, committeeIdx):
let validator = node.getAttachedValidator(valKey)
if validator == nil or not validator.index.isSome():
if isNil(validator) or validator.index.isNone():
continue
asyncSpawn createAndSendSyncCommitteeMessage(node, slot, validator,
committeeIdx, head)