From 0c635334a24bc5384cca7a2f7b754b1d2580adf7 Mon Sep 17 00:00:00 2001 From: Eugene Kabanov Date: Fri, 24 Sep 2021 01:13:25 +0300 Subject: [PATCH] Sync committee related REST API implementation. (#2856) --- .../consensus_object_pools/blockchain_dag.nim | 4 +- beacon_chain/rpc/rest_beacon_api.nim | 140 +++++++++++++- beacon_chain/rpc/rest_utils.nim | 34 +++- beacon_chain/rpc/rest_validator_api.nim | 182 ++++++++++++++++-- .../eth2_apis/eth2_rest_serialization.nim | 20 ++ beacon_chain/spec/eth2_apis/rest_types.nim | 10 + beacon_chain/validators/validator_duties.nim | 123 ++++++++++-- 7 files changed, 479 insertions(+), 34 deletions(-) diff --git a/beacon_chain/consensus_object_pools/blockchain_dag.nim b/beacon_chain/consensus_object_pools/blockchain_dag.nim index a76e967c8..7572e924b 100644 --- a/beacon_chain/consensus_object_pools/blockchain_dag.nim +++ b/beacon_chain/consensus_object_pools/blockchain_dag.nim @@ -9,7 +9,7 @@ import std/[options, sequtils, tables, sets], - stew/[assign2, byteutils], + stew/[assign2, byteutils, results], metrics, snappy, chronicles, ../spec/[ beaconstate, eth2_merkleization, eth2_ssz_serialization, forks, helpers, @@ -18,7 +18,7 @@ import ".."/beacon_chain_db, "."/[block_pools_types, block_quarantine, forkedbeaconstate_dbhelpers] -export block_pools_types +export block_pools_types, results # https://github.com/ethereum/eth2.0-metrics/blob/master/metrics.md#interop-metrics declareGauge beacon_head_root, "Root of the head block of the beacon chain" diff --git a/beacon_chain/rpc/rest_beacon_api.nim b/beacon_chain/rpc/rest_beacon_api.nim index 13ea231ee..567fbda15 100644 --- a/beacon_chain/rpc/rest_beacon_api.nim +++ b/beacon_chain/rpc/rest_beacon_api.nim @@ -97,6 +97,24 @@ proc toString*(kind: ValidatorFilterKind): string = of ValidatorFilterKind.WithdrawalDone: "withdrawal_done" +func syncCommitteeParticipants*(forkedState: ForkedHashedBeaconState, + epoch: Epoch): Result[seq[ValidatorPubKey], cstring] = + case forkedState.beaconStateFork + of BeaconStateFork.forkPhase0: + err("State's fork do not support sync committees") + of BeaconStateFork.forkAltair: + let + headSlot = forkedState.hbsAltair.data.slot + epochPeriod = syncCommitteePeriod(epoch.compute_start_slot_at_epoch()) + currentPeriod = syncCommitteePeriod(headSlot) + nextPeriod = currentPeriod + 1'u64 + if epochPeriod == currentPeriod: + ok(@(forkedState.hbsAltair.data.current_sync_committee.pubkeys.data)) + elif epochPeriod == nextPeriod: + ok(@(forkedState.hbsAltair.data.next_sync_committee.pubkeys.data)) + else: + err("Epoch is outside the sync committee period of the state") + proc installBeaconApiHandlers*(router: var RestRouter, node: BeaconNode) = # https://ethereum.github.io/beacon-APIs/#/Beacon/getGenesis router.api(MethodGet, "/api/eth/v1/beacon/genesis") do () -> RestApiResponse: @@ -468,7 +486,7 @@ proc installBeaconApiHandlers*(router: var RestRouter, node: BeaconNode) = none[Slot]() node.withStateForBlockSlot(bslot): proc getCommittee(slot: Slot, - index: CommitteeIndex): RestBeaconStatesCommittees = + index: CommitteeIndex): RestBeaconStatesCommittees = let validators = get_beacon_committee(stateData.data, slot, index, cache).mapIt(it) RestBeaconStatesCommittees(index: index, slot: slot, @@ -504,6 +522,85 @@ proc installBeaconApiHandlers*(router: var RestRouter, node: BeaconNode) = return RestApiResponse.jsonError(Http500, InternalServerError) + # https://ethereum.github.io/beacon-APIs/#/Beacon/getEpochSyncCommittees + router.api(MethodGet, + "/api/eth/v1/beacon/states/{state_id}/sync_committees") do ( + state_id: StateIdent, epoch: Option[Epoch]) -> RestApiResponse: + let bslot = + block: + if state_id.isErr(): + return RestApiResponse.jsonError(Http400, InvalidStateIdValueError, + $state_id.error()) + let bres = node.getBlockSlot(state_id.get()) + if bres.isErr(): + return RestApiResponse.jsonError(Http404, StateNotFoundError, + $bres.error()) + bres.get() + + let qepoch = + if epoch.isSome(): + let repoch = epoch.get() + if repoch.isErr(): + return RestApiResponse.jsonError(Http400, InvalidEpochValueError, + $repoch.error()) + let res = repoch.get() + if res > MaxEpoch: + return RestApiResponse.jsonError(Http400, EpochOverflowValueError) + if res < node.dag.cfg.ALTAIR_FORK_EPOCH: + return RestApiResponse.jsonError(Http400, + EpochFromTheIncorrectForkError) + res + else: + # If ``epoch`` not present then the sync committees for the epoch of + # the state will be obtained. + bslot.slot.epoch() + + node.withStateForBlockSlot(bslot): + let keys = + block: + let res = syncCommitteeParticipants(stateData().data, qepoch) + if res.isErr(): + return RestApiResponse.jsonError(Http400, + $res.error()) + let kres = res.get() + if len(kres) == 0: + return RestApiResponse.jsonError(Http500, InternalServerError, + "List of sync committee participants is empty") + kres + + let indices = + block: + var res: seq[ValidatorIndex] + let keyset = keys.toHashSet() + for index, validator in getStateField(stateData.data, + validators).pairs(): + if validator.pubkey in keyset: + res.add(ValidatorIndex(uint64(index))) + res + + if len(indices) != len(keys): + return RestApiResponse.jsonError(Http500, InternalServerError, + "Could not get validator indices") + let aggregates = + block: + var + res: seq[seq[ValidatorIndex]] + offset = 0 + while true: + let length = min(SYNC_SUBCOMMITTEE_SIZE, len(indices) - offset) + if length == 0: + break + res.add(@(indices.toOpenArray(offset, offset + length - 1))) + offset.inc(length) + res + + return RestApiResponse.jsonResponse(GetEpochSyncCommitteesResponse( + data: RestEpochSyncCommittee(validators: indices, + validator_aggregates: aggregates) + )) + + return RestApiResponse.jsonError(Http400, "Could not get requested state") + # https://ethereum.github.io/beacon-APIs/#/Beacon/getBlockHeaders router.api(MethodGet, "/api/eth/v1/beacon/headers") do ( slot: Option[Slot], parent_root: Option[Eth2Digest]) -> RestApiResponse: @@ -893,6 +990,37 @@ proc installBeaconApiHandlers*(router: var RestRouter, node: BeaconNode) = $res.error()) return RestApiResponse.jsonMsgResponse(ProposerSlashingValidationSuccess) + # https://ethereum.github.io/beacon-APIs/#/Beacon/submitPoolSyncCommitteeSignatures + router.api(MethodPost, "/api/eth/v1/beacon/pool/sync_committees") do ( + contentBody: Option[ContentBody]) -> RestApiResponse: + let messages = + block: + if contentBody.isNone(): + return RestApiResponse.jsonError(Http400, EmptyRequestBodyError) + let dres = decodeBody(seq[SyncCommitteeMessage], contentBody.get()) + if dres.isErr(): + return RestApiResponse.jsonError(Http400, + InvalidSyncCommitteeSignatureMessageError) + dres.get() + + let results = await node.sendSyncCommitteeMessages(messages) + + let failures = + block: + var res: seq[RestAttestationsFailure] + for index, item in results.pairs(): + if item.isErr(): + res.add(RestAttestationsFailure(index: uint64(index), + message: $item.error())) + res + if len(failures) > 0: + return RestApiResponse.jsonErrorList(Http400, + SyncCommitteeMessageValidationError, + failures) + else: + return RestApiResponse.jsonMsgResponse( + SyncCommitteeMessageValidationSuccess) + # https://ethereum.github.io/beacon-APIs/#/Beacon/getPoolVoluntaryExits router.api(MethodGet, "/api/eth/v1/beacon/pool/voluntary_exits") do ( ) -> RestApiResponse: @@ -965,6 +1093,11 @@ proc installBeaconApiHandlers*(router: var RestRouter, node: BeaconNode) = "/eth/v1/beacon/states/{state_id}/committees", "/api/eth/v1/beacon/states/{state_id}/committees" ) + router.redirect( + MethodGet, + "/eth/v1/beacon/states/{state_id}/sync_committees", + "/api/eth/v1/beacon/states/{state_id}/sync_committees" + ) router.redirect( MethodGet, "/eth/v1/beacon/headers", @@ -1030,6 +1163,11 @@ proc installBeaconApiHandlers*(router: var RestRouter, node: BeaconNode) = "/eth/v1/beacon/pool/proposer_slashings", "/api/eth/v1/beacon/pool/proposer_slashings" ) + router.redirect( + MethodPost, + "/eth/v1/beacon/pool/sync_committees", + "/api/eth/v1/beacon/pool/sync_committees" + ) router.redirect( MethodPost, "/eth/v1/beacon/pool/voluntary_exits", diff --git a/beacon_chain/rpc/rest_utils.nim b/beacon_chain/rpc/rest_utils.nim index e949985ca..c1cf79285 100644 --- a/beacon_chain/rpc/rest_utils.nim +++ b/beacon_chain/rpc/rest_utils.nim @@ -58,7 +58,9 @@ const AggregateAndProofValidationSuccess* = "Aggregate and proof object(s) was broadcasted" BeaconCommitteeSubscriptionSuccess* = - "Beacon node processed committee subscription request" + "Beacon node processed committee subscription request(s)" + SyncCommitteeSubscriptionSuccess* = + "Beacon node processed sync committee subscription request(s)" InvalidParentRootValueError* = "Invalid parent root value" MissingSlotValueError* = @@ -119,6 +121,10 @@ const "Requested slot not in next wall-slot epoch" SlotFromThePastError* = "Requested slot from the past" + SlotFromTheIncorrectForkError* = + "Requested slot is from incorrect fork" + EpochFromTheIncorrectForkError* = + "Requested epoch is from incorrect fork" ProposerNotFoundError* = "Could not find proposer for the head and slot" NoHeadForSlotError* = @@ -139,6 +145,32 @@ const "Could not find out accepted content type" InvalidAcceptError* = "Incorrect accept response type" + MissingSubCommitteeIndexValueError* = + "Missing `subcommittee_index` value" + InvalidSubCommitteeIndexValueError* = + "Invalid `subcommittee_index` value" + MissingBeaconBlockRootValueError* = + "Missing `beacon_block_root` value" + InvalidBeaconBlockRootValueError* = + "Invalid `beacon_block_root` value" + EpochOutsideSyncCommitteePeriodError* = + "Epoch is outside the sync committee period of the state" + InvalidSyncCommitteeSignatureMessageError* = + "Unable to decode sync committee message(s)" + InvalidSyncCommitteeSubscriptionRequestError* = + "Unable to decode sync committee subscription request(s)" + InvalidContributionAndProofMessageError* = + "Unable to decode contribute and proof message(s)" + SyncCommitteeMessageValidationError* = + "Some errors happened while validating sync committee message(s)" + SyncCommitteeMessageValidationSuccess* = + "Sync committee message(s) was broadcasted" + ContributionAndProofValidationError* = + "Some errors happened while validating contribution and proof(s)" + ContributionAndProofValidationSuccess* = + "Contribution and proof(s) was broadcasted" + ProduceContributionError* = + "Unable to produce contribution using the passed parameters" InternalServerError* = "Internal server error" NoImplementationError* = diff --git a/beacon_chain/rpc/rest_validator_api.nim b/beacon_chain/rpc/rest_validator_api.nim index 4a06ff3e7..2f66a4859 100644 --- a/beacon_chain/rpc/rest_validator_api.nim +++ b/beacon_chain/rpc/rest_validator_api.nim @@ -3,19 +3,18 @@ # * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). # * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). # at your option. This file may not be copied, modified, or distributed except according to those terms. -import - std/[typetraits, strutils], - stew/[results, base10], - chronicles, - json_serialization, json_serialization/std/[options, net], - nimcrypto/utils as ncrutils, - ".."/[beacon_chain_db, beacon_node_common], - ../networking/eth2_network, - ../consensus_object_pools/[blockchain_dag, spec_cache, attestation_pool], - ../validators/validator_duties, - ../spec/[forks, network], - ../spec/datatypes/[phase0, altair], - ./rest_utils +import std/[typetraits, strutils] +import stew/[results, base10], chronicles, json_serialization, + json_serialization/std/[options, net], + nimcrypto/utils as ncrutils +import ".."/[beacon_chain_db, beacon_node_common], + ".."/networking/eth2_network, + ".."/consensus_object_pools/[blockchain_dag, spec_cache, + attestation_pool, sync_committee_msg_pool], + ".."/validators/validator_duties, + ".."/spec/[forks, network], + ".."/spec/datatypes/[phase0, altair], + "."/rest_utils logScope: topics = "rest_validatorapi" @@ -383,7 +382,7 @@ proc installValidatorApiHandlers*(router: var RestRouter, node: BeaconNode) = block: let idx = request.validator_index if uint64(idx) >= - lenu64(getStateField(node.dag.headState.data, validators)): + lenu64(getStateField(node.dag.headState.data, validators)): return RestApiResponse.jsonError(Http400, InvalidValidatorIndexValueError) getStateField(node.dag.headState.data, validators)[idx].pubkey @@ -410,6 +409,146 @@ proc installValidatorApiHandlers*(router: var RestRouter, node: BeaconNode) = warn "Beacon committee subscription request served, but not implemented" return RestApiResponse.jsonMsgResponse(BeaconCommitteeSubscriptionSuccess) + # https://ethereum.github.io/beacon-APIs/#/Validator/prepareSyncCommitteeSubnets + router.api(MethodPost, + "/api/eth/v1/validator/sync_committee_subscriptions") do ( + contentBody: Option[ContentBody]) -> RestApiResponse: + let subscriptions = + block: + if contentBody.isNone(): + return RestApiResponse.jsonError(Http400, EmptyRequestBodyError) + let dres = decodeBody(seq[RestSyncCommitteeSubscription], + contentBody.get()) + if dres.isErr(): + return RestApiResponse.jsonError(Http400, + InvalidSyncCommitteeSubscriptionRequestError) + let subs = dres.get() + for item in subs: + if item.until_epoch > MaxEpoch: + return RestApiResponse.jsonError(Http400, EpochOverflowValueError) + if item.until_epoch < node.dag.cfg.ALTAIR_FORK_EPOCH: + return RestApiResponse.jsonError(Http400, + EpochFromTheIncorrectForkError) + if uint64(item.validator_index) >= + lenu64(getStateField(node.dag.headState.data, validators)): + return RestApiResponse.jsonError(Http400, + InvalidValidatorIndexValueError) + subs + + warn "Sync committee subscription request served, but not implemented" + return RestApiResponse.jsonMsgResponse(SyncCommitteeSubscriptionSuccess) + + # https://ethereum.github.io/beacon-APIs/#/Validator/produceSyncCommitteeContribution + router.api(MethodGet, + "/api/eth/v1/validator/sync_committee_contribution") do ( + slot: Option[Slot], subcommittee_index: Option[uint64], + beacon_block_root: Option[Eth2Digest]) -> RestApiResponse: + # We doing this check to avoid any confusion in future. + static: doAssert(SYNC_COMMITTEE_SUBNET_COUNT <= high(uint8)) + let qslot = + if slot.isNone(): + return RestApiResponse.jsonError(Http400, MissingSlotValueError) + else: + let res = slot.get() + if res.isErr(): + return RestApiResponse.jsonError(Http400, InvalidSlotValueError, + $res.error()) + let rslot = res.get() + if epoch(rslot) < node.dag.cfg.ALTAIR_FORK_EPOCH: + return RestApiResponse.jsonError(Http400, + SlotFromTheIncorrectForkError) + rslot + let qindex = + if subcommittee_index.isNone(): + return RestApiResponse.jsonError(Http400, + MissingSubCommitteeIndexValueError) + else: + let res = subcommittee_index.get() + if res.isErr(): + return RestApiResponse.jsonError(Http400, + InvalidSubCommitteeIndexValueError, + $res.error()) + let value = res.get() + if value >= SYNC_COMMITTEE_SUBNET_COUNT: + return RestApiResponse.jsonError(Http400, + InvalidSubCommitteeIndexValueError, + "subcommittee_index exceeds " & + "maximum allowed value") + value + let qroot = + if beacon_block_root.isNone(): + return RestApiResponse.jsonError(Http400, + MissingBeaconBlockRootValueError) + else: + let res = beacon_block_root.get() + if res.isErr(): + return RestApiResponse.jsonError(Http400, + InvalidBeaconBlockRootValueError, + $res.error()) + res.get() + + # Check if node is fully synced. + let sres = node.getCurrentHead(qslot) + if sres.isErr(): + return RestApiResponse.jsonError(Http503, BeaconNodeInSyncError) + + var contribution = SyncCommitteeContribution() + let res = node.syncCommitteeMsgPool[].produceContribution( + qslot, qroot, SyncCommitteeIndex(qindex), contribution) + if not(res): + return RestApiResponse.jsonError(Http400, ProduceContributionError) + return RestApiResponse.jsonResponse(contribution) + + # https://ethereum.github.io/beacon-APIs/#/Validator/publishContributionAndProofs + router.api(MethodPost, + "/api/eth/v1/validator/contribution_and_proofs") do ( + contentBody: Option[ContentBody]) -> RestApiResponse: + let proofs = + block: + if contentBody.isNone(): + return RestApiResponse.jsonError(Http400, EmptyRequestBodyError) + let dres = decodeBody(seq[SignedContributionAndProof], + contentBody.get()) + if dres.isErr(): + return RestApiResponse.jsonError(Http400, + InvalidContributionAndProofMessageError) + dres.get() + + let pending = + block: + var res: seq[Future[SendResult]] + for proof in proofs: + res.add(node.sendSyncCommitteeContribution(proof, true)) + res + + let failures = + block: + var res: seq[RestAttestationsFailure] + await allFutures(pending) + for index, future in pending.pairs(): + if future.done(): + let fres = future.read() + if fres.isErr(): + let failure = RestAttestationsFailure(index: uint64(index), + message: $fres.error()) + res.add(failure) + elif future.failed() or future.cancelled(): + # This is unexpected failure, so we log the error message. + let exc = future.readError() + let failure = RestAttestationsFailure(index: uint64(index), + message: $exc.msg) + res.add(failure) + res + + if len(failures) > 0: + return RestApiResponse.jsonErrorList(Http400, + ContributionAndProofValidationError, + failures) + else: + return RestApiResponse.jsonMsgResponse( + ContributionAndProofValidationSuccess + ) + router.redirect( MethodPost, "/eth/v1/validator/duties/attester/{epoch}", @@ -450,3 +589,18 @@ proc installValidatorApiHandlers*(router: var RestRouter, node: BeaconNode) = "/eth/v1/validator/beacon_committee_subscriptions", "/api/eth/v1/validator/beacon_committee_subscriptions" ) + router.redirect( + MethodPost, + "/eth/v1/validator/sync_committee_subscriptions", + "/api/eth/v1/validator/sync_committee_subscriptions" + ) + router.redirect( + MethodGet, + "/eth/v1/validator/sync_committee_contribution", + "/api/eth/v1/validator/sync_committee_contribution" + ) + router.redirect( + MethodPost, + "/eth/v1/validator/contribution_and_proofs", + "/api/eth/v1/validator/contribution_and_proofs" + ) diff --git a/beacon_chain/spec/eth2_apis/eth2_rest_serialization.nim b/beacon_chain/spec/eth2_apis/eth2_rest_serialization.nim index d2b7f7b78..f5f4f48bb 100644 --- a/beacon_chain/spec/eth2_apis/eth2_rest_serialization.nim +++ b/beacon_chain/spec/eth2_apis/eth2_rest_serialization.nim @@ -803,6 +803,22 @@ template toSszType*(v: BeaconStateFork): auto = of BeaconStateFork.forkPhase0: Phase0Version of BeaconStateFork.forkAltair: AltairVersion +# SyncCommitteeIndex +proc writeValue*(writer: var JsonWriter[RestJson], + value: SyncCommitteeIndex) {. + raises: [IOError, Defect].} = + writeValue(writer, Base10.toString(uint8(value))) + +proc readValue*(reader: var JsonReader[RestJson], + value: var SyncCommitteeIndex) {. + raises: [IOError, SerializationError, Defect].} = + let res = Base10.decode(uint8, reader.readValue(string)) + if res.isOk(): + # TODO (cheatfate): Here should be present check for maximum value. + value = SyncCommitteeIndex(res.get()) + else: + reader.raiseUnexpectedValue($res.error()) + proc parseRoot(value: string): Result[Eth2Digest, cstring] = try: ok(Eth2Digest(data: hexToByteArray[32](value))) @@ -1030,6 +1046,10 @@ proc decodeString*(t: typedesc[Epoch], value: string): Result[Epoch, cstring] = let res = ? Base10.decode(uint64, value) ok(Epoch(res)) +proc decodeString*(t: typedesc[uint64], + value: string): Result[uint64, cstring] = + Base10.decode(uint64, value) + proc decodeString*(t: typedesc[StateIdent], value: string): Result[StateIdent, cstring] = if len(value) > 2: diff --git a/beacon_chain/spec/eth2_apis/rest_types.nim b/beacon_chain/spec/eth2_apis/rest_types.nim index 273ca6570..022febf2b 100644 --- a/beacon_chain/spec/eth2_apis/rest_types.nim +++ b/beacon_chain/spec/eth2_apis/rest_types.nim @@ -108,6 +108,11 @@ type slot*: Slot is_aggregator*: bool + RestSyncCommitteeSubscription* = object + validator_index*: ValidatorIndex + sync_committee_indices*: seq[SyncCommitteeIndex] + until_epoch*: Epoch + RestBeaconStatesFinalityCheckpoints* = object previous_justified*: Checkpoint current_justified*: Checkpoint @@ -260,6 +265,10 @@ type RestBlockInfo* = object slot*: Slot blck* {.serializedFieldName: "block".}: Eth2Digest + + RestEpochSyncCommittee* = object + validators*: seq[ValidatorIndex] + validator_aggregates*: seq[seq[ValidatorIndex]] DataEnclosedObject*[T] = object data*: T @@ -305,6 +314,7 @@ type GetStateValidatorsResponse* = DataEnclosedObject[seq[RestValidator]] GetSyncingStatusResponse* = DataEnclosedObject[RestSyncInfo] GetVersionResponse* = DataEnclosedObject[RestNodeVersion] + GetEpochSyncCommitteesResponse* = DataEnclosedObject[RestEpochSyncCommittee] ProduceAttestationDataResponse* = DataEnclosedObject[AttestationData] ProduceBlockResponse* = DataEnclosedObject[phase0.BeaconBlock] ProduceBlockResponseV2* = ForkedBeaconBlock diff --git a/beacon_chain/validators/validator_duties.nim b/beacon_chain/validators/validator_duties.nim index 629fd109c..84049d763 100644 --- a/beacon_chain/validators/validator_duties.nim +++ b/beacon_chain/validators/validator_duties.nim @@ -200,28 +200,118 @@ proc sendAttestation*( proc sendSyncCommitteeMessage*( node: BeaconNode, msg: SyncCommitteeMessage, - committeeIdx: SyncCommitteeIndex, checkSignature: bool): Future[bool] {.async.} = + committeeIdx: SyncCommitteeIndex, + checkSignature: bool): Future[SendResult] {.async.} = # Validate sync committee message before sending it via gossip # validation will also register the message with the sync committee # message pool. Notably, although libp2p calls the data handler for # any subscription on the subnet topic, it does not perform validation. - let ok = node.processor.syncCommitteeMsgValidator( - msg, committeeIdx, checkSignature) - - return case ok + let res = node.processor.syncCommitteeMsgValidator(msg, committeeIdx, + checkSignature) + return + case res of ValidationResult.Accept: node.network.broadcastSyncCommitteeMessage(msg, committeeIdx) beacon_sync_committee_messages_sent.inc() - true + SendResult.ok() else: - notice "Produced sync committee message failed validation", - msg, result = $ok - false + notice "Sync committee message failed validation", + msg, result = $res + SendResult.err("Sync committee message failed validation") + +proc sendSyncCommitteeMessages*(node: BeaconNode, + msgs: seq[SyncCommitteeMessage] + ): Future[seq[SendResult]] {.async.} = + let validators = getStateField(node.dag.headState.data, validators) + var statuses = newSeq[Option[SendResult]](len(msgs)) + + let ranges = + block: + let + headSlot = getStateField(node.dag.headState.data, slot) + headCommitteePeriod = syncCommitteePeriod(headSlot) + currentStart = syncCommitteePeriodStartSlot(headCommitteePeriod) + currentFinish = currentStart + SLOTS_PER_SYNC_COMMITTEE_PERIOD + nextStart = currentFinish + nextFinish = nextStart + SLOTS_PER_SYNC_COMMITTEE_PERIOD + (curStart: Slot(currentStart), curFinish: Slot(currentFinish), + nxtStart: Slot(nextStart), nxtFinish: Slot(nextFinish)) + + let (keysCur, keysNxt) = + block: + var resCur: Table[ValidatorPubKey, int] + var resNxt: Table[ValidatorPubKey, int] + for index, msg in msgs.pairs(): + if msg.validator_index < lenu64(validators): + if (msg.slot >= ranges.curStart) and (msg.slot < ranges.curFinish): + resCur[validators[msg.validator_index].pubkey] = index + elif (msg.slot >= ranges.nxtStart) and (msg.slot < ranges.nxtFinish): + resNxt[validators[msg.validator_index].pubkey] = index + else: + statuses[index] = + some(SendResult.err("Message's slot out of state's head range")) + else: + statuses[index] = some(SendResult.err("Incorrect validator's index")) + if (len(resCur) == 0) and (len(resNxt) == 0): + return statuses.mapIt(it.get()) + (resCur, resNxt) + + template curParticipants(): untyped = + node.dag.headState.data.hbsAltair.data.current_sync_committee.pubkeys.data + template nxtParticipants(): untyped = + node.dag.headState.data.hbsAltair.data.next_sync_committee.pubkeys.data + + let (pending, indices) = + block: + var resFutures: seq[Future[SendResult]] + var resIndices: seq[int] + for committeeIdx in allSyncCommittees(): + for valKey in syncSubcommittee(curParticipants(), committeeIdx): + let index = keysCur.getOrDefault(valKey, -1) + if index >= 0: + resIndices.add(index) + resFutures.add(node.sendSyncCommitteeMessage(msgs[index], + committeeIdx, true)) + for committeeIdx in allSyncCommittees(): + for valKey in syncSubcommittee(nxtParticipants(), committeeIdx): + let index = keysNxt.getOrDefault(valKey, -1) + if index >= 0: + resIndices.add(index) + resFutures.add(node.sendSyncCommitteeMessage(msgs[index], + committeeIdx, true)) + (resFutures, resIndices) + + await allFutures(pending) + + for index, future in pending.pairs(): + if future.done(): + let fres = future.read() + if fres.isErr(): + statuses[indices[index]] = some(SendResult.err(fres.error())) + else: + statuses[indices[index]] = some(SendResult.ok()) + elif future.failed() or future.cancelled(): + let exc = future.readError() + debug "Unexpected failure while sending committee message", + message = msgs[indices[index]], error = $exc.msg + statuses[indices[index]] = some(SendResult.err( + "Unexpected failure while sending committee message")) + + let results = + block: + var res: seq[SendResult] + for item in statuses: + if item.isSome(): + res.add(item.get()) + else: + res.add(SendResult.err("Message validator not in sync committee")) + res + return results proc sendSyncCommitteeContribution*( node: BeaconNode, msg: SignedContributionAndProof, - checkSignature: bool): Future[bool] {.async.} = + checkSignature: bool): Future[SendResult] {.async.} = let ok = node.processor.syncCommitteeContributionValidator( msg, checkSignature) @@ -229,11 +319,11 @@ proc sendSyncCommitteeContribution*( of ValidationResult.Accept: node.network.broadcastSignedContributionAndProof(msg) beacon_sync_committee_contributions_sent.inc() - true + SendResult.ok() else: - notice "Produced sync committee contribution failed validation", + notice "Sync committee contribution failed validation", msg, result = $ok - false + SendResult.err("Sync committee contribution failed validation") proc createAndSendAttestation(node: BeaconNode, fork: Fork, @@ -598,9 +688,10 @@ proc createAndSendSyncCommitteeMessage(node: BeaconNode, msg = await signSyncCommitteeMessage(validator, slot, fork, genesisValidatorsRoot, head.root) - let ok = await node.sendSyncCommitteeMessage( + let res = await node.sendSyncCommitteeMessage( msg, committeeIdx, checkSignature = false) - if not ok: # Logged in sendSyncCommitteeMessage + if res.isErr(): + # Logged in sendSyncCommitteeMessage return if node.config.dumpEnabled: @@ -635,7 +726,7 @@ proc handleSyncCommitteeMessages(node: BeaconNode, head: BlockRef, slot: Slot) = for committeeIdx in allSyncCommittees(): for valKey in syncSubcommittee(syncCommittee, committeeIdx): let validator = node.getAttachedValidator(valKey) - if validator == nil or not validator.index.isSome(): + if isNil(validator) or validator.index.isNone(): continue asyncSpawn createAndSendSyncCommitteeMessage(node, slot, validator, committeeIdx, head)