Implement all sync committee duties in the validator client (#3583)

Other changes:

* logtrace can now verify sync committee messages and contributions
* Many unnecessary use of pairs() have been removed for consistency
* Map 40x BN response codes to BeaconNodeStatus.Incompatible in the VC
This commit is contained in:
zah 2022-05-10 13:03:40 +03:00 committed by GitHub
parent 6d11ad6ce1
commit a2ba34f686
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
39 changed files with 1296 additions and 142 deletions

View File

@ -80,7 +80,7 @@ func checkMissing*(quarantine: var Quarantine): seq[FetchRecord] =
quarantine.missing.del(k)
# simple (simplistic?) exponential backoff for retries..
for k, v in quarantine.missing.pairs():
for k, v in quarantine.missing:
if countOnes(v.tries.uint64) == 1:
result.add(FetchRecord(root: k))
@ -164,7 +164,7 @@ func addUnviable*(quarantine: var Quarantine, root: Eth2Digest) =
func cleanupOrphans(quarantine: var Quarantine, finalizedSlot: Slot) =
var toDel: seq[(Eth2Digest, ValidatorSig)]
for k, v in quarantine.orphans.pairs():
for k, v in quarantine.orphans:
if not isViableOrphan(finalizedSlot, v):
toDel.add k

View File

@ -85,7 +85,7 @@ iterator get_attesting_indices*(epochRef: EpochRef,
trace "get_attesting_indices: inconsistent aggregation and committee length"
else:
for index_in_committee, validator_index in get_beacon_committee(
epochRef, slot, committee_index).pairs():
epochRef, slot, committee_index):
if bits[index_in_committee]:
yield validator_index

View File

@ -586,7 +586,7 @@ when isMainModule:
doAssert err.isOk, "compute_deltas finished with error: " & $err
for i, delta in deltas.pairs:
for i, delta in deltas:
if i == 0:
doAssert delta == Delta(Balance * validator_count), "The 0th root should have a delta"
else:
@ -625,7 +625,7 @@ when isMainModule:
doAssert err.isOk, "compute_deltas finished with error: " & $err
for i, delta in deltas.pairs:
for i, delta in deltas:
doAssert delta == Delta(Balance), "Each root should have a delta"
for vote in votes:
@ -663,7 +663,7 @@ when isMainModule:
doAssert err.isOk, "compute_deltas finished with error: " & $err
for i, delta in deltas.pairs:
for i, delta in deltas:
if i == 0:
doAssert delta == -TotalDeltas, "0th root should have a negative delta"
elif i == 1:
@ -750,7 +750,7 @@ when isMainModule:
doAssert err.isOk, "compute_deltas finished with error: " & $err
for i, delta in deltas.pairs:
for i, delta in deltas:
if i == 0:
doAssert delta == -TotalOldDeltas, "0th root should have a negative delta"
elif i == 1:

View File

@ -455,7 +455,7 @@ proc scheduleContributionChecks*(
proofFut = batchCrypto.withBatch("scheduleContributionAndProofChecks.selection_proof"):
sync_committee_selection_proof_set(
fork, genesis_validators_root, contribution.slot,
contribution.subcommittee_index, aggregatorKey, proofSig)
subcommitteeIdx, aggregatorKey, proofSig)
contributionFut = batchCrypto.withBatch("scheduleContributionAndProofChecks.contribution"):
sync_committee_message_signature_set(
fork, genesis_validators_root, contribution.slot,

View File

@ -1319,7 +1319,7 @@ proc trimConnections(node: Eth2Node, count: int) =
currentVal.count + 1
)
for peerId, gScore in gossipScores.pairs:
for peerId, gScore in gossipScores:
scores[peerId] =
scores.getOrDefault(peerId) + (gScore.sum div gScore.count)

View File

@ -302,8 +302,10 @@ proc installApiHandlers*(node: SigningNode) =
let
forkInfo = request.forkInfo.get()
msg = request.syncAggregatorSelectionData
subcommittee = SyncSubcommitteeIndex.init(msg.subcommittee_index).valueOr:
return errorResponse(Http400, InvalidSubCommitteeIndexValueError)
cooked = get_sync_committee_selection_proof(forkInfo.fork,
forkInfo.genesis_validators_root, msg.slot, msg.subcommittee_index,
forkInfo.genesis_validators_root, msg.slot, subcommittee,
validator.data.privateKey)
signature = cooked.toValidatorSig().toHex()
signatureResponse(Http200, signature)

View File

@ -5,7 +5,7 @@
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.
import validator_client/[common, fallback_service, duties_service,
attestation_service, fork_service]
attestation_service, fork_service, sync_committee_service]
proc initGenesis(vc: ValidatorClientRef): Future[RestGenesis] {.async.} =
info "Initializing genesis", nodes_count = len(vc.beaconNodes)
@ -126,6 +126,7 @@ proc asyncInit(vc: ValidatorClientRef) {.async.} =
vc.forkService = await ForkServiceRef.init(vc)
vc.dutiesService = await DutiesServiceRef.init(vc)
vc.attestationService = await AttestationServiceRef.init(vc)
vc.syncCommitteeService = await SyncCommitteeServiceRef.init(vc)
proc onSlotStart(vc: ValidatorClientRef, wallTime: BeaconTime,
lastSlot: Slot) {.async.} =
@ -159,6 +160,7 @@ proc asyncRun(vc: ValidatorClientRef) {.async.} =
vc.forkService.start()
vc.dutiesService.start()
vc.attestationService.start()
vc.syncCommitteeService.start()
await runSlotLoop(vc, vc.beaconClock.now(), onSlotStart)

View File

@ -272,8 +272,7 @@ proc installBeaconApiHandlers*(router: var RestRouter, node: BeaconNode) =
# return empty response.
if len(validatorIds) == 0:
# There is no indices, so we going to filter all the validators.
for index, validator in getStateField(state,
validators).pairs():
for index, validator in getStateField(state, validators):
let
balance = getStateField(state, balances).asSeq()[index]
status =
@ -447,7 +446,7 @@ proc installBeaconApiHandlers*(router: var RestRouter, node: BeaconNode) =
if len(validatorIds) == 0:
# There is no indices, so we going to return balances of all
# known validators.
for index, balance in getStateField(state, balances).pairs():
for index, balance in getStateField(state, balances):
res.add(RestValidatorBalance.init(ValidatorIndex(index),
balance))
else:
@ -905,7 +904,7 @@ proc installBeaconApiHandlers*(router: var RestRouter, node: BeaconNode) =
block:
var res: seq[RestAttestationsFailure]
await allFutures(pending)
for index, future in pending.pairs():
for index, future in pending:
if future.done():
let fres = future.read()
if fres.isErr():
@ -1008,7 +1007,7 @@ proc installBeaconApiHandlers*(router: var RestRouter, node: BeaconNode) =
let failures =
block:
var res: seq[RestAttestationsFailure]
for index, item in results.pairs():
for index, item in results:
if item.isErr():
res.add(RestAttestationsFailure(index: uint64(index),
message: $item.error()))

View File

@ -144,7 +144,7 @@ proc installKeymanagerHandlers*(router: var RestRouter, node: BeaconNode) =
var response: PostKeystoresResponse
for index, item in request.keystores.pairs():
for index, item in request.keystores:
let res = importKeystore(node.attachedValidators[], node.network.rng[],
node.config, item, request.passwords[index])
if res.isErr():
@ -189,7 +189,7 @@ proc installKeymanagerHandlers*(router: var RestRouter, node: BeaconNode) =
response.slashing_protection.metadata = nodeSPDIR.metadata
for index, key in keys.pairs():
for index, key in keys:
let
res = removeValidator(node.attachedValidators[], node.config, key,
KeystoreKind.Local)
@ -221,7 +221,7 @@ proc installKeymanagerHandlers*(router: var RestRouter, node: BeaconNode) =
if value.status == $KeystoreStatus.notFound:
value.status = $KeystoreStatus.notActive
for index, key in keys.pairs():
for index, key in keys:
response.data.add(keysAndDeleteStatus[key.blob.PubKey0x.PubKeyBytes])
return RestApiResponse.jsonResponsePlain(response)
@ -254,7 +254,7 @@ proc installKeymanagerHandlers*(router: var RestRouter, node: BeaconNode) =
var response: PostKeystoresResponse
for index, key in keys.pairs():
for index, key in keys:
let
remoteInfo = RemoteSignerInfo(
url: key.url,
@ -322,7 +322,7 @@ proc installKeymanagerHandlers*(router: var RestRouter, node: BeaconNode) =
var response: PostKeystoresResponse
for index, key in keys.pairs():
for index, key in keys:
let keystore = RemoteKeystore(
version: 2'u64,
remoteType: RemoteSignerType.Web3Signer,
@ -352,7 +352,7 @@ proc installKeymanagerHandlers*(router: var RestRouter, node: BeaconNode) =
dres.get.pubkeys
var response: DeleteRemoteKeystoresResponse
for index, key in keys.pairs():
for index, key in keys:
let status = node.removeValidator(key)
response.data.add(status)

View File

@ -241,7 +241,7 @@ func keysToIndices*(cacheTable: var Table[ValidatorPubKey, ValidatorIndex],
var keyset =
block:
var res: Table[ValidatorPubKey, int]
for inputIndex, pubkey in keys.pairs():
for inputIndex, pubkey in keys:
# Try to search in cache first.
cacheTable.withValue(pubkey, vindex):
if uint64(vindex[]) < totalValidatorsInState:
@ -250,8 +250,7 @@ func keysToIndices*(cacheTable: var Table[ValidatorPubKey, ValidatorIndex],
res[pubkey] = inputIndex
res
if len(keyset) > 0:
for validatorIndex, validator in getStateField(forkedState,
validators).pairs():
for validatorIndex, validator in getStateField(forkedState, validators):
keyset.withValue(validator.pubkey, listIndex):
# Store pair (pubkey, index) into cache table.
cacheTable[validator.pubkey] = ValidatorIndex(validatorIndex)

View File

@ -631,8 +631,6 @@ proc installValidatorApiHandlers*(router: var RestRouter, node: BeaconNode) =
subs
# TODO because we subscribe to all sync committee subnets, we don not need
# to remember which ones are requested by validator clients
return RestApiResponse.jsonMsgResponse(SyncCommitteeSubscriptionSuccess)
# https://ethereum.github.io/beacon-APIs/#/Validator/produceSyncCommitteeContribution
@ -717,7 +715,7 @@ proc installValidatorApiHandlers*(router: var RestRouter, node: BeaconNode) =
block:
var res: seq[RestAttestationsFailure]
await allFutures(pending)
for index, future in pending.pairs():
for index, future in pending:
if future.done():
let fres = future.read()
if fres.isErr():

View File

@ -230,7 +230,7 @@ proc installBeaconApiHandlers*(rpcServer: RpcServer, node: BeaconNode) {.
vquery = vqres.get()
if validatorIds.isNone():
for index, validator in getStateField(state, validators).pairs():
for index, validator in getStateField(state, validators):
let sres = validator.getStatus(current_epoch)
if sres.isOk:
let vstatus = sres.get()
@ -257,7 +257,7 @@ proc installBeaconApiHandlers*(rpcServer: RpcServer, node: BeaconNode) {.
status: vstatus,
balance: getStateField(state, balances).asSeq()[index]))
for index, validator in getStateField(state, validators).pairs():
for index, validator in getStateField(state, validators):
if validator.pubkey in vquery.keyset:
let sres = validator.getStatus(current_epoch)
if sres.isOk:
@ -292,7 +292,7 @@ proc installBeaconApiHandlers*(rpcServer: RpcServer, node: BeaconNode) {.
else:
raise newException(CatchableError, "Incorrect validator's state")
else:
for index, validator in getStateField(state, validators).pairs():
for index, validator in getStateField(state, validators):
if validator.pubkey in vquery.keyset:
let sres = validator.getStatus(current_epoch)
if sres.isOk:
@ -308,7 +308,7 @@ proc installBeaconApiHandlers*(rpcServer: RpcServer, node: BeaconNode) {.
var res: seq[RpcBalance]
withStateForStateId(stateId):
if validatorsId.isNone():
for index, value in getStateField(state, balances).pairs():
for index, value in getStateField(state, balances):
let balance = (index: uint64(index), balance: value)
res.add(balance)
else:
@ -325,7 +325,7 @@ proc installBeaconApiHandlers*(rpcServer: RpcServer, node: BeaconNode) {.
balance: getStateField(state, balances).asSeq()[index])
res.add(balance)
for index, validator in getStateField(state, validators).pairs():
for index, validator in getStateField(state, validators):
if validator.pubkey in vquery.keyset:
let balance = (index: uint64(index),
balance: getStateField(state, balances).asSeq()[index])

View File

@ -450,7 +450,7 @@ func get_attesting_indices*(state: ForkyBeaconState,
trace "get_attesting_indices: invalid attestation data"
else:
for index_in_committee, validator_index in get_beacon_committee(
state, data.slot, committee_index.get(), cache).pairs():
state, data.slot, committee_index.get(), cache):
if bits[index_in_committee]:
res.add validator_index

View File

@ -547,7 +547,8 @@ template `[]`*(arr: array[SYNC_COMMITTEE_SIZE, auto] | seq;
idx: IndexInSyncCommittee): auto =
arr[int idx]
makeLimitedU64(SyncSubcommitteeIndex, SYNC_COMMITTEE_SUBNET_COUNT)
makeLimitedU8(SyncSubcommitteeIndex, SYNC_COMMITTEE_SUBNET_COUNT)
makeLimitedU16(IndexInSyncCommittee, SYNC_COMMITTEE_SIZE)
func shortLog*(v: SomeBeaconBlock): auto =
(

View File

@ -535,9 +535,10 @@ func getImmutableValidatorData*(validator: Validator): ImmutableValidatorData2 =
pubkey: cookedKey.get(),
withdrawal_credentials: validator.withdrawal_credentials)
template makeLimitedU64*(T: untyped, limit: uint64) =
template makeLimitedUInt*(T: untyped, limit: SomeUnsignedInt) =
# A "tigher" type is often used for T, but for the range check to be effective
# it must make sense..
type L = typeof limit
static: doAssert limit <= distinctBase(T).high()
# Many `uint64` values in the spec have a more limited range of valid values
@ -582,6 +583,15 @@ template makeLimitedU64*(T: untyped, limit: uint64) =
template toSszType(x: T): uint64 =
{.error: "Limited types should not be used with SSZ (abi differences)".}
template makeLimitedU64*(T: untyped, limit: uint64) =
makeLimitedUInt(T, limit)
template makeLimitedU8*(T: untyped, limit: uint8) =
makeLimitedUInt(T, limit)
template makeLimitedU16*(T: type, limit: uint16) =
makeLimitedUInt(T, limit)
makeLimitedU64(CommitteeIndex, MAX_COMMITTEES_PER_SLOT)
makeLimitedU64(SubnetId, ATTESTATION_SUBNET_COUNT)

View File

@ -43,7 +43,7 @@ type
message*: string
stacktraces*: Option[seq[string]]
RestAttestationError* = object
RestDutyError* = object
code*: uint64
message*: string
failures*: seq[RestFailureItem]
@ -82,7 +82,7 @@ type
GetStateV2Response |
GetStateForkResponse |
ProduceBlockResponseV2 |
RestAttestationError |
RestDutyError |
RestValidator |
RestGenericError |
Web3SignerErrorResponse |

View File

@ -543,7 +543,6 @@ type
GetPoolProposerSlashingsResponse* = DataEnclosedObject[seq[ProposerSlashing]]
GetPoolVoluntaryExitsResponse* = DataEnclosedObject[seq[SignedVoluntaryExit]]
GetProposerDutiesResponse* = DataRootEnclosedObject[seq[RestProposerDuty]]
GetSyncCommitteeDutiesResponse* = DataEnclosedObject[seq[RestSyncCommitteeDuty]]
GetSpecResponse* = DataEnclosedObject[RestSpec]
GetSpecVCResponse* = DataEnclosedObject[RestSpecVC]
GetStateFinalityCheckpointsResponse* = DataEnclosedObject[RestBeaconStatesFinalityCheckpoints]
@ -552,12 +551,14 @@ type
GetStateValidatorBalancesResponse* = DataEnclosedObject[seq[RestValidatorBalance]]
GetStateValidatorResponse* = DataEnclosedObject[RestValidator]
GetStateValidatorsResponse* = DataEnclosedObject[seq[RestValidator]]
GetSyncCommitteeDutiesResponse* = DataRootEnclosedObject[seq[RestSyncCommitteeDuty]]
GetSyncingStatusResponse* = DataEnclosedObject[RestSyncInfo]
GetVersionResponse* = DataEnclosedObject[RestNodeVersion]
GetEpochSyncCommitteesResponse* = DataEnclosedObject[RestEpochSyncCommittee]
ProduceAttestationDataResponse* = DataEnclosedObject[AttestationData]
ProduceBlockResponse* = DataEnclosedObject[phase0.BeaconBlock]
ProduceBlockResponseV2* = ForkedBeaconBlock
ProduceSyncCommitteeContributionResponse* = DataEnclosedObject[SyncCommitteeContribution]
func `==`*(a, b: RestValidatorIndex): bool =
uint64(a) == uint64(b)
@ -749,3 +750,54 @@ func init*(t: typedesc[Web3SignerRequest], fork: Fork,
signingRoot: signingRoot,
syncCommitteeContributionAndProof: data
)
func init*(t: typedesc[RestSyncCommitteeMessage],
slot: Slot,
beacon_block_root: Eth2Digest,
validator_index: uint64,
signature: ValidatorSig): RestSyncCommitteeMessage =
RestSyncCommitteeMessage(
slot: slot,
beacon_block_root: beacon_block_root,
validator_index: validator_index,
signature: signature
)
func init*(t: typedesc[RestSyncCommitteeContribution],
slot: Slot,
beacon_block_root: Eth2Digest,
subcommittee_index: uint64,
aggregation_bits: SyncCommitteeAggregationBits,
signature: ValidatorSig): RestSyncCommitteeContribution =
RestSyncCommitteeContribution(
slot: slot,
beacon_block_root: beacon_block_root,
subcommittee_index: subcommittee_index,
aggregation_bits: aggregation_bits,
signature: signature)
func init*(t: typedesc[RestContributionAndProof],
aggregator_index: uint64,
selection_proof: ValidatorSig,
contribution: SyncCommitteeContribution): RestContributionAndProof =
RestContributionAndProof(
aggregator_index: aggregator_index,
selection_proof: selection_proof,
contribution: RestSyncCommitteeContribution.init(
contribution.slot,
contribution.beacon_block_root,
contribution.subcommittee_index,
contribution.aggregation_bits,
contribution.signature
))
func init*(t: typedesc[RestSignedContributionAndProof],
message: ContributionAndProof,
signature: ValidatorSig): RestSignedContributionAndProof =
RestSignedContributionAndProof(
message: RestContributionAndProof.init(
message.aggregator_index,
message.selection_proof,
message.contribution
),
signature: signature)

View File

@ -76,7 +76,8 @@ proc prepareSyncCommitteeSubnets*(body: seq[RestSyncCommitteeSubscription]): Res
proc produceSyncCommitteeContribution*(slot: Slot,
subcommittee_index: SyncSubcommitteeIndex,
beacon_block_root: Eth2Digest): RestPlainResponse {.
beacon_block_root: Eth2Digest
): RestResponse[ProduceSyncCommitteeContributionResponse] {.
rest, endpoint: "/eth/v1/validator/sync_committee_contribution",
meth: MethodGet.}
## https://ethereum.github.io/beacon-APIs/#/Validator/produceSyncCommitteeContribution

View File

@ -307,7 +307,7 @@ func assign*(tgt: var ForkedHashedBeaconState, src: ForkedHashedBeaconState) =
template getStateField*(x: ForkedHashedBeaconState, y: untyped): untyped =
# The use of `unsafeAddr` avoids excessive copying in certain situations, e.g.,
# ```
# for index, validator in getStateField(stateData.data, validators).pairs():
# for index, validator in getStateField(stateData.data, validators):
# ```
# Without `unsafeAddr`, the `validators` list would be copied to a temporary variable.
(case x.kind

View File

@ -268,18 +268,18 @@ proc verify_sync_committee_signature*(
# https://github.com/ethereum/consensus-specs/blob/v1.1.10/specs/altair/validator.md#aggregation-selection
func compute_sync_committee_selection_proof_signing_root*(
fork: Fork, genesis_validators_root: Eth2Digest,
slot: Slot, subcommittee_index: uint64): Eth2Digest =
slot: Slot, subcommittee_index: SyncSubcommitteeIndex): Eth2Digest =
let
domain = get_domain(fork, DOMAIN_SYNC_COMMITTEE_SELECTION_PROOF,
slot.epoch, genesis_validators_root)
signing_data = SyncAggregatorSelectionData(
slot: slot,
subcommittee_index: subcommittee_index)
subcommittee_index: uint64 subcommittee_index)
compute_signing_root(signing_data, domain)
func get_sync_committee_selection_proof*(
fork: Fork, genesis_validators_root: Eth2Digest,
slot: Slot, subcommittee_index: uint64,
slot: Slot, subcommittee_index: SyncSubcommitteeIndex,
privkey: ValidatorPrivKey): CookedSig =
let signing_root = compute_sync_committee_selection_proof_signing_root(
fork, genesis_validators_root, slot, subcommittee_index)
@ -288,7 +288,7 @@ func get_sync_committee_selection_proof*(
proc verify_sync_committee_selection_proof*(
fork: Fork, genesis_validators_root: Eth2Digest,
slot: Slot, subcommittee_index: uint64,
slot: Slot, subcommittee_index: SyncSubcommitteeIndex,
pubkey: ValidatorPubKey | CookedPubKey, signature: SomeSig): bool =
withTrust(signature):
let signing_root = compute_sync_committee_selection_proof_signing_root(

View File

@ -199,7 +199,7 @@ proc sync_committee_message_signature_set*(
# See also: verify_sync_committee_selection_proof
proc sync_committee_selection_proof_set*(
fork: Fork, genesis_validators_root: Eth2Digest,
slot: Slot, subcommittee_index: uint64,
slot: Slot, subcommittee_index: SyncSubcommitteeIndex,
pubkey: CookedPubKey, signature: CookedSig): SignatureSet =
let signing_root = compute_sync_committee_selection_proof_signing_root(
fork, genesis_validators_root, slot, subcommittee_index)

View File

@ -209,7 +209,7 @@ template onceToAll*(vc: ValidatorClientRef, responseType: typedesc,
except CancelledError:
ApiOperation.Interrupt
for idx, node {.inject.} in onlineNodes.pairs():
for idx, node {.inject.} in onlineNodes:
it = node.client
let apiResponse {.inject.} =
block:
@ -342,16 +342,17 @@ template firstSuccessTimeout*(vc: ValidatorClientRef, respType: typedesc,
if exitNow:
break
let offlineMask = {RestBeaconNodeStatus.Offline,
RestBeaconNodeStatus.NotSynced,
RestBeaconNodeStatus.Uninitalized}
let offlineNodes = vc.beaconNodes.filterIt(it.status in offlineMask)
let onlineNodesCount = len(vc.beaconNodes) - len(offlineNodes)
let unusableModeMask = {RestBeaconNodeStatus.Offline,
RestBeaconNodeStatus.NotSynced,
RestBeaconNodeStatus.Uninitalized,
RestBeaconNodeStatus.Incompatible}
let unusableNodes = vc.beaconNodes.filterIt(it.status in unusableModeMask)
let onlineNodesCount = len(vc.beaconNodes) - len(unusableNodes)
warn "No working beacon nodes available, refreshing nodes status",
online_nodes = onlineNodesCount, offline_nodes = len(offlineNodes)
online_nodes = onlineNodesCount, unusable_nodes = len(unusableNodes)
var checkFut = vc.checkNodes(offlineMask)
var checkFut = vc.checkNodes(unusableModeMask)
let checkOp =
block:
@ -410,8 +411,7 @@ template firstSuccessTimeout*(vc: ValidatorClientRef, respType: typedesc,
break
proc getProposerDuties*(vc: ValidatorClientRef,
epoch: Epoch): Future[GetProposerDutiesResponse] {.
async.} =
epoch: Epoch): Future[GetProposerDutiesResponse] {.async.} =
logScope: request = "getProposerDuties"
vc.firstSuccessTimeout(RestResponse[GetProposerDutiesResponse], SlotDuration,
getProposerDuties(it, epoch)):
@ -428,7 +428,7 @@ proc getProposerDuties*(vc: ValidatorClientRef,
of 400:
debug "Received invalid request response",
response_code = response.status, endpoint = node
RestBeaconNodeStatus.Offline
RestBeaconNodeStatus.Incompatible
of 500:
debug "Received internal error response",
response_code = response.status, endpoint = node
@ -444,9 +444,10 @@ proc getProposerDuties*(vc: ValidatorClientRef,
raise newException(ValidatorApiError, "Unable to retrieve proposer duties")
proc getAttesterDuties*(vc: ValidatorClientRef, epoch: Epoch,
validators: seq[ValidatorIndex]
): Future[GetAttesterDutiesResponse] {.async.} =
proc getAttesterDuties*(
vc: ValidatorClientRef,
epoch: Epoch,
validators: seq[ValidatorIndex]): Future[GetAttesterDutiesResponse] {.async.} =
logScope: request = "getAttesterDuties"
vc.firstSuccessTimeout(RestResponse[GetAttesterDutiesResponse], SlotDuration,
getAttesterDuties(it, epoch, validators)):
@ -463,7 +464,7 @@ proc getAttesterDuties*(vc: ValidatorClientRef, epoch: Epoch,
of 400:
debug "Received invalid request response",
response_code = response.status, endpoint = node
RestBeaconNodeStatus.Offline
RestBeaconNodeStatus.Incompatible
of 500:
debug "Received internal error response",
response_code = response.status, endpoint = node
@ -479,6 +480,42 @@ proc getAttesterDuties*(vc: ValidatorClientRef, epoch: Epoch,
raise newException(ValidatorApiError, "Unable to retrieve attester duties")
proc getSyncCommitteeDuties*(
vc: ValidatorClientRef,
epoch: Epoch,
validators: seq[ValidatorIndex]): Future[GetSyncCommitteeDutiesResponse] {.async.} =
logScope: request = "getSyncCommitteeDuties"
vc.firstSuccessTimeout(RestResponse[GetSyncCommitteeDutiesResponse], SlotDuration,
getSyncCommitteeDuties(it, epoch, validators)):
if apiResponse.isErr():
debug "Unable to retrieve sync committee duties", endpoint = node,
error = apiResponse.error()
RestBeaconNodeStatus.Offline
else:
let response = apiResponse.get()
case response.status
of 200:
debug "Received successful response", endpoint = node
return response.data
of 400:
debug "Received invalid request response",
response_code = response.status, endpoint = node
RestBeaconNodeStatus.Incompatible
of 500:
debug "Received internal error response",
response_code = response.status, endpoint = node
RestBeaconNodeStatus.Offline
of 503:
debug "Received not synced error response",
response_code = response.status, endpoint = node
RestBeaconNodeStatus.NotSynced
else:
debug "Received unexpected error response",
response_code = response.status, endpoint = node
RestBeaconNodeStatus.Offline
raise newException(ValidatorApiError, "Unable to retrieve sync committee duties")
proc getForkSchedule*(vc: ValidatorClientRef): Future[seq[Fork]] {.async.} =
logScope: request = "getForkSchedule"
vc.firstSuccessTimeout(RestResponse[GetForkScheduleResponse], SlotDuration,
@ -521,7 +558,7 @@ proc getHeadStateFork*(vc: ValidatorClientRef): Future[Fork] {.async.} =
of 400, 404:
debug "Received invalid request response",
response_code = response.status, endpoint = node
RestBeaconNodeStatus.Offline
RestBeaconNodeStatus.Incompatible
of 500:
debug "Received internal error response",
response_code = response.status, endpoint = node
@ -533,9 +570,39 @@ proc getHeadStateFork*(vc: ValidatorClientRef): Future[Fork] {.async.} =
raise newException(ValidatorApiError, "Unable to retrieve head state's fork")
proc getValidators*(vc: ValidatorClientRef,
id: seq[ValidatorIdent]): Future[seq[RestValidator]] {.
async.} =
proc getHeadBlockRoot*(vc: ValidatorClientRef): Future[RestRoot] {.async.} =
logScope: request = "getHeadBlockRoot"
let blockIdent = BlockIdent.init(BlockIdentType.Head)
vc.firstSuccessTimeout(RestResponse[GetBlockRootResponse], SlotDuration,
getBlockRoot(it, blockIdent)):
if apiResponse.isErr():
debug "Unable to retrieve head block's root", endpoint = node,
error = apiResponse.error()
RestBeaconNodeStatus.Offline
else:
let response = apiResponse.get()
case response.status
of 200:
debug "Received successful response", endpoint = node
return response.data.data
of 400, 404:
debug "Received invalid request response",
response_code = response.status, endpoint = node
RestBeaconNodeStatus.Incompatible
of 500:
debug "Received internal error response",
response_code = response.status, endpoint = node
RestBeaconNodeStatus.Offline
else:
debug "Received unexpected error response",
response_code = response.status, endpoint = node
RestBeaconNodeStatus.Offline
raise newException(ValidatorApiError, "Unable to retrieve head block's root")
proc getValidators*(
vc: ValidatorClientRef,
id: seq[ValidatorIdent]): Future[seq[RestValidator]] {.async.} =
logScope: request = "getStateValidators"
let stateIdent = StateIdent.init(StateIdentType.Head)
vc.firstSuccessTimeout(RestResponse[GetStateValidatorsResponse], SlotDuration,
@ -553,7 +620,7 @@ proc getValidators*(vc: ValidatorClientRef,
of 400, 404:
debug "Received invalid request response",
response_code = response.status, endpoint = node
RestBeaconNodeStatus.Offline
RestBeaconNodeStatus.Incompatible
of 500:
debug "Received internal error response",
response_code = response.status, endpoint = node
@ -566,9 +633,10 @@ proc getValidators*(vc: ValidatorClientRef,
raise newException(ValidatorApiError,
"Unable to retrieve head state's validator information")
proc produceAttestationData*(vc: ValidatorClientRef, slot: Slot,
committee_index: CommitteeIndex
): Future[AttestationData] {.async.} =
proc produceAttestationData*(
vc: ValidatorClientRef,
slot: Slot,
committee_index: CommitteeIndex): Future[AttestationData] {.async.} =
logScope: request = "produceAttestationData"
vc.firstSuccessTimeout(RestResponse[ProduceAttestationDataResponse],
OneThirdDuration,
@ -586,7 +654,7 @@ proc produceAttestationData*(vc: ValidatorClientRef, slot: Slot,
of 400:
debug "Received invalid request response",
response_code = response.status, endpoint = node
RestBeaconNodeStatus.Offline
RestBeaconNodeStatus.Incompatible
of 500:
debug "Received internal error response",
response_code = response.status, endpoint = node
@ -602,8 +670,8 @@ proc produceAttestationData*(vc: ValidatorClientRef, slot: Slot,
raise newException(ValidatorApiError, "Unable to retrieve attestation data")
proc getAttestationErrorMessage(response: RestPlainResponse): string =
let res = decodeBytes(RestAttestationError, response.data,
proc getDutyErrorMessage(response: RestPlainResponse): string =
let res = decodeBytes(RestDutyError, response.data,
response.contentType)
if res.isOk():
let errorObj = res.get()
@ -626,8 +694,7 @@ proc getGenericErrorMessage(response: RestPlainResponse): string =
"Unable to decode error response: [" & $res.error() & "]"
proc submitPoolAttestations*(vc: ValidatorClientRef,
data: seq[Attestation]): Future[bool] {.
async.} =
data: seq[Attestation]): Future[bool] {.async.} =
logScope: request = "submitPoolAttestations"
vc.firstSuccessTimeout(RestPlainResponse, SlotDuration,
submitPoolAttestations(it, data)):
@ -644,24 +711,62 @@ proc submitPoolAttestations*(vc: ValidatorClientRef,
of 400:
debug "Received invalid request response",
response_code = response.status, endpoint = node,
response_error = response.getAttestationErrorMessage()
RestBeaconNodeStatus.Offline
response_error = response.getDutyErrorMessage()
RestBeaconNodeStatus.Incompatible
of 500:
debug "Received internal error response",
response_code = response.status, endpoint = node,
response_error = response.getAttestationErrorMessage()
response_error = response.getDutyErrorMessage()
RestBeaconNodeStatus.Offline
else:
debug "Received unexpected error response",
response_code = response.status, endpoint = node,
response_error = response.getAttestationErrorMessage()
response_error = response.getDutyErrorMessage()
RestBeaconNodeStatus.Offline
raise newException(ValidatorApiError, "Unable to submit attestation")
proc submitPoolSyncCommitteeSignature*(vc: ValidatorClientRef,
data: SyncCommitteeMessage): Future[bool] {.async.} =
logScope: request = "submitPoolSyncCommitteeSignatures"
let restData = RestSyncCommitteeMessage.init(
data.slot,
data.beacon_block_root,
data.validator_index,
data.signature
)
vc.firstSuccessTimeout(RestPlainResponse, SlotDuration,
submitPoolSyncCommitteeSignatures(it, @[restData])):
if apiResponse.isErr():
debug "Unable to submit sync committee message", endpoint = node,
error = apiResponse.error()
RestBeaconNodeStatus.Offline
else:
let response = apiResponse.get()
case response.status
of 200:
debug "Sync committee message was successfully published", endpoint = node
return true
of 400:
debug "Received invalid request response",
response_code = response.status, endpoint = node,
response_error = response.getDutyErrorMessage()
RestBeaconNodeStatus.Incompatible
of 500:
debug "Received internal error response",
response_code = response.status, endpoint = node,
response_error = response.getDutyErrorMessage()
RestBeaconNodeStatus.Offline
else:
debug "Received unexpected error response",
response_code = response.status, endpoint = node,
response_error = response.getDutyErrorMessage()
RestBeaconNodeStatus.Offline
raise newException(ValidatorApiError, "Unable to submit sync committee message")
proc getAggregatedAttestation*(vc: ValidatorClientRef, slot: Slot,
root: Eth2Digest): Future[Attestation] {.
async.} =
root: Eth2Digest): Future[Attestation] {.async.} =
logScope: request = "getAggregatedAttestation"
vc.firstSuccessTimeout(RestResponse[GetAggregatedAttestationResponse],
OneThirdDuration,
@ -679,7 +784,7 @@ proc getAggregatedAttestation*(vc: ValidatorClientRef, slot: Slot,
of 400:
debug "Received invalid request response",
response_code = response.status, endpoint = node
RestBeaconNodeStatus.Offline
RestBeaconNodeStatus.Incompatible
of 500:
debug "Received internal error response",
response_code = response.status, endpoint = node
@ -692,9 +797,48 @@ proc getAggregatedAttestation*(vc: ValidatorClientRef, slot: Slot,
raise newException(ValidatorApiError,
"Unable to retrieve aggregated attestation data")
proc publishAggregateAndProofs*(vc: ValidatorClientRef,
data: seq[SignedAggregateAndProof]): Future[bool] {.
async.} =
proc produceSyncCommitteeContribution*(
vc: ValidatorClientRef,
slot: Slot,
subcommitteeIndex: SyncSubcommitteeIndex,
root: Eth2Digest): Future[SyncCommitteeContribution] {.async.} =
logScope: request = "produceSyncCommitteeContribution"
vc.firstSuccessTimeout(RestResponse[ProduceSyncCommitteeContributionResponse],
OneThirdDuration,
produceSyncCommitteeContribution(it,
slot,
subcommitteeIndex,
root)):
if apiResponse.isErr():
debug "Unable to retrieve sync committee contribution data",
endpoint = node,
error = apiResponse.error()
RestBeaconNodeStatus.Offline
else:
let response = apiResponse.get()
case response.status:
of 200:
debug "Received successful response", endpoint = node
return response.data.data
of 400:
debug "Received invalid request response",
response_code = response.status, endpoint = node
RestBeaconNodeStatus.Incompatible
of 500:
debug "Received internal error response",
response_code = response.status, endpoint = node
RestBeaconNodeStatus.Offline
else:
debug "Received unexpected error response",
response_code = response.status, endpoint = node
RestBeaconNodeStatus.Offline
raise newException(ValidatorApiError,
"Unable to retrieve sync committee contribution data")
proc publishAggregateAndProofs*(
vc: ValidatorClientRef,
data: seq[SignedAggregateAndProof]): Future[bool] {.async.} =
logScope: request = "publishAggregateAndProofs"
vc.firstSuccessTimeout(RestPlainResponse, SlotDuration,
publishAggregateAndProofs(it, data)):
@ -712,7 +856,7 @@ proc publishAggregateAndProofs*(vc: ValidatorClientRef,
debug "Received invalid request response",
response_code = response.status, endpoint = node,
response_error = response.getGenericErrorMessage()
RestBeaconNodeStatus.Offline
RestBeaconNodeStatus.Incompatible
of 500:
debug "Received internal error response",
response_code = response.status, endpoint = node,
@ -727,10 +871,46 @@ proc publishAggregateAndProofs*(vc: ValidatorClientRef,
raise newException(ValidatorApiError,
"Unable to publish aggregate and proofs")
proc produceBlockV2*(vc: ValidatorClientRef, slot: Slot,
randao_reveal: ValidatorSig,
graffiti: GraffitiBytes): Future[ProduceBlockResponseV2] {.
async.} =
proc publishContributionAndProofs*(
vc: ValidatorClientRef,
data: seq[RestSignedContributionAndProof]): Future[bool] {.async.} =
logScope: request = "publishContributionAndProofs"
vc.firstSuccessTimeout(RestPlainResponse, SlotDuration,
publishContributionAndProofs(it, data)):
if apiResponse.isErr():
debug "Unable to publish contribution and proofs", endpoint = node,
error = apiResponse.error()
RestBeaconNodeStatus.Offline
else:
let response = apiResponse.get()
case response.status:
of 200:
debug "Contribution and proofs were successfully published", endpoint = node
return true
of 400:
debug "Received invalid request response",
response_code = response.status, endpoint = node,
response_error = response.getGenericErrorMessage()
RestBeaconNodeStatus.Incompatible
of 500:
debug "Received internal error response",
response_code = response.status, endpoint = node,
response_error = response.getGenericErrorMessage()
RestBeaconNodeStatus.Offline
else:
debug "Received unexpected error response",
response_code = response.status, endpoint = node,
response_error = response.getGenericErrorMessage()
RestBeaconNodeStatus.Offline
raise newException(ValidatorApiError,
"Unable to publish contribution and proofs")
proc produceBlockV2*(
vc: ValidatorClientRef,
slot: Slot,
randao_reveal: ValidatorSig,
graffiti: GraffitiBytes): Future[ProduceBlockResponseV2] {.async.} =
logScope: request = "produceBlockV2"
vc.firstSuccessTimeout(RestResponse[ProduceBlockResponseV2],
SlotDuration,
@ -748,7 +928,7 @@ proc produceBlockV2*(vc: ValidatorClientRef, slot: Slot,
of 400:
debug "Received invalid request response",
response_code = response.status, endpoint = node
RestBeaconNodeStatus.Offline
RestBeaconNodeStatus.Incompatible
of 500:
debug "Received internal error response",
response_code = response.status, endpoint = node
@ -794,7 +974,7 @@ proc publishBlock*(vc: ValidatorClientRef,
debug "Received invalid request response",
response_code = response.status, endpoint = node,
response_error = response.getGenericErrorMessage()
RestBeaconNodeStatus.Offline
RestBeaconNodeStatus.Incompatible
of 500:
debug "Received internal error response",
response_code = response.status, endpoint = node,
@ -813,9 +993,9 @@ proc publishBlock*(vc: ValidatorClientRef,
raise newException(ValidatorApiError, "Unable to publish block")
proc prepareBeaconCommitteeSubnet*(vc: ValidatorClientRef,
data: seq[RestCommitteeSubscription]
): Future[bool] {.async.} =
proc prepareBeaconCommitteeSubnet*(
vc: ValidatorClientRef,
data: seq[RestCommitteeSubscription]): Future[bool] {.async.} =
logScope: request = "prepareBeaconCommitteeSubnet"
vc.firstSuccessTimeout(RestPlainResponse, OneThirdDuration,
prepareBeaconCommitteeSubnet(it, data)):
@ -851,3 +1031,44 @@ proc prepareBeaconCommitteeSubnet*(vc: ValidatorClientRef,
RestBeaconNodeStatus.Offline
raise newException(ValidatorApiError, "Unable to prepare committee subnet")
proc prepareSyncCommitteeSubnets*(
vc: ValidatorClientRef,
data: seq[RestSyncCommitteeSubscription]): Future[bool] {.async.} =
logScope: request = "prepareSyncCommitteeSubnet"
vc.firstSuccessTimeout(RestPlainResponse, OneThirdDuration,
prepareSyncCommitteeSubnets(it, data)):
if apiResponse.isErr():
debug "Unable to prepare sync committee subnet", endpoint = node,
error = apiResponse.error()
RestBeaconNodeStatus.Offline
else:
let response = apiResponse.get()
case response.status
of 200:
debug "Sync committee subnet was successfully prepared",
endpoint = node
return true
of 400:
debug "Received invalid request response",
response_code = response.status, endpoint = node,
response_error = response.getGenericErrorMessage()
return false
of 500:
debug "Received internal error response",
response_code = response.status, endpoint = node,
response_error = response.getGenericErrorMessage()
RestBeaconNodeStatus.Offline
of 503:
debug "Received not synced error response",
response_code = response.status, endpoint = node,
response_error = response.getGenericErrorMessage()
RestBeaconNodeStatus.NotSynced
else:
debug "Received unexpected error response",
response_code = response.status, endpoint = node,
response_error = response.getGenericErrorMessage()
RestBeaconNodeStatus.Offline
raise newException(ValidatorApiError,
"Unable to prepare sync committee subnet")

View File

@ -353,7 +353,7 @@ proc spawnAttestationTasks(service: AttestationServiceRef,
for item in attesters:
res.mgetOrPut(item.data.committee_index, default).add(item)
res
for index, duties in dutiesByCommittee.pairs():
for index, duties in dutiesByCommittee:
if len(duties) > 0:
asyncSpawn service.publishAttestationsAndAggregates(slot, index, duties)

View File

@ -10,7 +10,8 @@ import
validator],
../spec/eth2_apis/[eth2_rest_serialization, rest_beacon_client],
../validators/[keystore_management, validator_pool, slashing_protection],
".."/[conf, beacon_clock, version, nimbus_binary_common]
".."/[conf, beacon_clock, version, nimbus_binary_common],
../spec/eth2_apis/eth2_rest_serialization
export os, sets, sequtils, sequtils, chronos, presto, chronicles, confutils,
nimbus_binary_common, version, conf, options, tables, results, base10,
@ -51,12 +52,28 @@ type
BlockServiceRef* = ref object of ClientServiceRef
SyncCommitteeServiceRef* = ref object of ClientServiceRef
DutyAndProof* = object
epoch*: Epoch
dependentRoot*: Eth2Digest
data*: RestAttesterDuty
slotSig*: Option[ValidatorSig]
SyncCommitteeDuty* = object
pubkey*: ValidatorPubKey
validator_index*: ValidatorIndex
validator_sync_committee_index*: IndexInSyncCommittee
SyncDutyAndProof* = object
epoch*: Epoch
data*: SyncCommitteeDuty
slotSig*: Option[ValidatorSig]
SyncCommitteeSubscriptionInfo* = object
validator_index*: ValidatorIndex
validator_sync_committee_indices*: seq[IndexInSyncCommittee]
ProposerTask* = object
duty*: RestProposerDuty
future*: Future[void]
@ -78,12 +95,16 @@ type
EpochDuties* = object
duties*: Table[Epoch, DutyAndProof]
EpochSyncDuties* = object
duties*: Table[Epoch, SyncDutyAndProof]
RestBeaconNodeStatus* {.pure.} = enum
Uninitalized, Offline, Incompatible, NotSynced, Online
BeaconNodeServerRef* = ref BeaconNodeServer
AttesterMap* = Table[ValidatorPubKey, EpochDuties]
SyncCommitteeDutiesMap* = Table[ValidatorPubKey, EpochSyncDuties]
ProposerMap* = Table[Epoch, ProposedData]
ValidatorClient* = object
@ -96,6 +117,7 @@ type
dutiesService*: DutiesServiceRef
attestationService*: AttestationServiceRef
blockService*: BlockServiceRef
syncCommitteeService*: SyncCommitteeServiceRef
runSlotLoop*: Future[void]
beaconClock*: BeaconClock
attachedValidators*: ValidatorPool
@ -103,6 +125,7 @@ type
forksAvailable*: AsyncEvent
attesters*: AttesterMap
proposers*: ProposerMap
syncCommitteeDuties*: SyncCommitteeDutiesMap
beaconGenesis*: RestGenesis
proposerTasks*: Table[Slot, seq[ProposerTask]]
@ -113,6 +136,7 @@ type
const
DefaultDutyAndProof* = DutyAndProof(epoch: Epoch(0xFFFF_FFFF_FFFF_FFFF'u64))
DefaultSyncDutyAndProof* = SyncDutyAndProof(epoch: Epoch(0xFFFF_FFFF_FFFF_FFFF'u64))
SlotDuration* = int64(SECONDS_PER_SLOT).seconds
OneThirdDuration* = int64(SECONDS_PER_SLOT).seconds div INTERVALS_PER_SLOT
@ -146,6 +170,9 @@ proc stop*(csr: ClientServiceRef) {.async.} =
proc isDefault*(dap: DutyAndProof): bool =
dap.epoch == Epoch(0xFFFF_FFFF_FFFF_FFFF'u64)
proc isDefault*(sdap: SyncDutyAndProof): bool =
sdap.epoch == Epoch(0xFFFF_FFFF_FFFF_FFFF'u64)
proc isDefault*(prd: ProposedData): bool =
prd.epoch == Epoch(0xFFFF_FFFF_FFFF_FFFF'u64)
@ -155,6 +182,11 @@ proc init*(t: typedesc[DutyAndProof], epoch: Epoch, dependentRoot: Eth2Digest,
DutyAndProof(epoch: epoch, dependentRoot: dependentRoot, data: duty,
slotSig: slotSig)
proc init*(t: typedesc[SyncDutyAndProof], epoch: Epoch,
duty: SyncCommitteeDuty,
slotSig: Option[ValidatorSig]): SyncDutyAndProof =
SyncDutyAndProof(epoch: epoch, data: duty, slotSig: slotSig)
proc init*(t: typedesc[ProposedData], epoch: Epoch, dependentRoot: Eth2Digest,
data: openArray[ProposerTask]): ProposedData =
ProposedData(epoch: epoch, dependentRoot: dependentRoot, duties: @data)
@ -174,22 +206,45 @@ proc getCurrentSlot*(vc: ValidatorClientRef): Option[Slot] =
proc getAttesterDutiesForSlot*(vc: ValidatorClientRef,
slot: Slot): seq[DutyAndProof] =
## Returns all `DutyAndPrrof` for the given `slot`.
## Returns all `DutyAndProof` for the given `slot`.
var res: seq[DutyAndProof]
let epoch = slot.epoch()
for key, item in vc.attesters.pairs():
for key, item in vc.attesters:
let duty = item.duties.getOrDefault(epoch, DefaultDutyAndProof)
if not(duty.isDefault()):
if duty.data.slot == slot:
res.add(duty)
res
proc getSyncCommitteeDutiesForSlot*(vc: ValidatorClientRef,
slot: Slot): seq[SyncDutyAndProof] =
## Returns all `SyncDutyAndProof` for the given `slot`.
var res: seq[SyncDutyAndProof]
let epoch = slot.epoch()
for key, item in mpairs(vc.syncCommitteeDuties):
item.duties.withValue(epoch, duty):
res.add(duty[])
res
proc removeOldSyncPeriodDuties*(vc: ValidatorClientRef,
slot: Slot) =
if slot.is_sync_committee_period:
let epoch = slot.epoch()
var prunedDuties = SyncCommitteeDutiesMap()
for key, item in vc.syncCommitteeDuties:
var curPeriodDuties = EpochSyncDuties()
for epochKey, epochDuty in item.duties:
if epochKey >= epoch:
curPeriodDuties.duties[epochKey] = epochDuty
prunedDuties[key] = curPeriodDuties
vc.syncCommitteeDuties = prunedDuties
proc getDurationToNextAttestation*(vc: ValidatorClientRef,
slot: Slot): string =
var minSlot = FAR_FUTURE_SLOT
let currentEpoch = slot.epoch()
for epoch in [currentEpoch, currentEpoch + 1'u64]:
for key, item in vc.attesters.pairs():
for key, item in vc.attesters:
let duty = item.duties.getOrDefault(epoch, DefaultDutyAndProof)
if not(duty.isDefault()):
let dutySlotTime = duty.data.slot
@ -222,11 +277,31 @@ proc getDurationToNextBlock*(vc: ValidatorClientRef, slot: Slot): string =
iterator attesterDutiesForEpoch*(vc: ValidatorClientRef,
epoch: Epoch): DutyAndProof =
for key, item in vc.attesters.pairs():
for key, item in vc.attesters:
let epochDuties = item.duties.getOrDefault(epoch)
if not(isDefault(epochDuties)):
yield epochDuties
proc syncMembersSubscriptionInfoForEpoch*(
vc: ValidatorClientRef,
epoch: Epoch): seq[SyncCommitteeSubscriptionInfo] =
var res: seq[SyncCommitteeSubscriptionInfo]
for key, item in mpairs(vc.syncCommitteeDuties):
var cur: SyncCommitteeSubscriptionInfo
var initialized = false
item.duties.withValue(epoch, epochDuties):
if not initialized:
cur.validator_index = epochDuties.data.validator_index
initialized = true
cur.validator_sync_committee_indices.add(
epochDuties.data.validator_sync_committee_index)
if initialized:
res.add cur
res
proc getDelay*(vc: ValidatorClientRef, deadline: BeaconTime): TimeDiff =
vc.beaconClock.now() - deadline
@ -253,3 +328,6 @@ proc forkAtEpoch*(vc: ValidatorClientRef, epoch: Epoch): Fork =
else:
break
res
proc getSubcommitteeIndex*(syncCommitteeIndex: IndexInSyncCommittee): SyncSubcommitteeIndex =
SyncSubcommitteeIndex(uint16(syncCommitteeIndex) div SYNC_SUBCOMMITTEE_SIZE)

View File

@ -6,13 +6,14 @@ logScope: service = "duties_service"
type
DutiesServiceLoop* = enum
AttesterLoop, ProposerLoop, IndicesLoop
AttesterLoop, ProposerLoop, IndicesLoop, SyncCommitteeLoop
chronicles.formatIt(DutiesServiceLoop):
case it
of AttesterLoop: "attester_loop"
of ProposerLoop: "proposer_loop"
of IndicesLoop: "index_loop"
of SyncCommitteeLoop: "sync_committee_loop"
proc checkDuty(duty: RestAttesterDuty): bool =
(duty.committee_length <= MAX_VALIDATORS_PER_COMMITTEE) and
@ -20,6 +21,9 @@ proc checkDuty(duty: RestAttesterDuty): bool =
(uint64(duty.validator_committee_index) <= duty.committee_length) and
(uint64(duty.validator_index) <= VALIDATOR_REGISTRY_LIMIT)
proc checkSyncDuty(duty: RestSyncCommitteeDuty): bool =
uint64(duty.validator_index) <= VALIDATOR_REGISTRY_LIMIT
proc pollForValidatorIndices*(vc: ValidatorClientRef) {.async.} =
let validatorIdents =
block:
@ -161,7 +165,7 @@ proc pollForAttesterDuties*(vc: ValidatorClientRef,
await allFutures(pending)
for index, fut in pending.pairs():
for index, fut in pending:
let item = addOrReplaceItems[index]
let dap =
if fut.done():
@ -185,11 +189,109 @@ proc pollForAttesterDuties*(vc: ValidatorClientRef,
return len(addOrReplaceItems)
proc pollForSyncCommitteeDuties*(vc: ValidatorClientRef,
epoch: Epoch): Future[int] {.async.} =
let validatorIndices = toSeq(vc.attachedValidators.indices())
var
filteredDuties: seq[RestSyncCommitteeDuty]
offset = 0
remainingItems = len(validatorIndices)
while offset < len(validatorIndices):
let
arraySize = min(MaximumValidatorIds, remainingItems)
indices = validatorIndices[offset ..< (offset + arraySize)]
res =
try:
await vc.getSyncCommitteeDuties(epoch, indices)
except ValidatorApiError:
error "Unable to get sync committee duties", epoch = epoch
return 0
except CatchableError as exc:
error "Unexpected error occurred while getting sync committee duties",
epoch = epoch, err_name = exc.name, err_msg = exc.msg
return 0
for item in res.data:
if checkSyncDuty(item) and (item.pubkey in vc.attachedValidators):
filteredDuties.add(item)
offset += arraySize
remainingItems -= arraySize
let
relevantDuties =
block:
var res: seq[SyncCommitteeDuty]
for duty in filteredDuties:
for validatorSyncCommitteeIndex in duty.validator_sync_committee_indices:
res.add(SyncCommitteeDuty(
pubkey: duty.pubkey,
validator_index: duty.validator_index,
validator_sync_committee_index: validatorSyncCommitteeIndex))
res
fork = vc.forkAtEpoch(epoch)
genesisRoot = vc.beaconGenesis.genesis_validators_root
let addOrReplaceItems =
block:
var res: seq[tuple[epoch: Epoch, duty: SyncCommitteeDuty]]
for duty in relevantDuties:
let map = vc.syncCommitteeDuties.getOrDefault(duty.pubkey)
let epochDuty = map.duties.getOrDefault(epoch, DefaultSyncDutyAndProof)
info "Received new sync committee duty", duty, epoch
res.add((epoch, duty))
res
if len(addOrReplaceItems) > 0:
var pending: seq[Future[SignatureResult]]
var validators: seq[AttachedValidator]
let sres = vc.getCurrentSlot()
if sres.isSome():
for item in addOrReplaceItems:
let validator = vc.attachedValidators.getValidator(item.duty.pubkey)
let future = validator.getSyncCommitteeSelectionProof(
fork,
genesisRoot,
sres.get(),
getSubcommitteeIndex(item.duty.validator_sync_committee_index))
pending.add(future)
validators.add(validator)
await allFutures(pending)
for index, fut in pending:
let item = addOrReplaceItems[index]
let dap =
if fut.done():
let sigRes = fut.read()
if sigRes.isErr():
error "Unable to create slot signature using remote signer",
validator = shortLog(validators[index]),
error_msg = sigRes.error()
SyncDutyAndProof.init(item.epoch, item.duty,
none[ValidatorSig]())
else:
SyncDutyAndProof.init(item.epoch, item.duty,
some(sigRes.get()))
else:
SyncDutyAndProof.init(item.epoch, item.duty,
none[ValidatorSig]())
var validatorDuties = vc.syncCommitteeDuties.getOrDefault(item.duty.pubkey)
validatorDuties.duties[item.epoch] = dap
vc.syncCommitteeDuties[item.duty.pubkey] = validatorDuties
return len(addOrReplaceItems)
proc pruneAttesterDuties(vc: ValidatorClientRef, epoch: Epoch) =
var attesters: AttesterMap
for key, item in vc.attesters.pairs():
for key, item in vc.attesters:
var v = EpochDuties()
for epochKey, epochDuty in item.duties.pairs():
for epochKey, epochDuty in item.duties:
if (epochKey + HISTORICAL_DUTIES_EPOCHS) >= epoch:
v.duties[epochKey] = epochDuty
else:
@ -251,9 +353,45 @@ proc pollForAttesterDuties*(vc: ValidatorClientRef) {.async.} =
vc.pruneAttesterDuties(currentEpoch)
proc pollForSyncCommitteeDuties* (vc: ValidatorClientRef) {.async.} =
let sres = vc.getCurrentSlot()
if sres.isSome():
let
currentSlot = sres.get()
currentEpoch = currentSlot.epoch()
nextEpoch = currentEpoch + 1'u64
if vc.attachedValidators.count() != 0:
var counts: array[2, tuple[epoch: Epoch, count: int]]
counts[0] = (currentEpoch, await vc.pollForSyncCommitteeDuties(currentEpoch))
counts[1] = (nextEpoch, await vc.pollForSyncCommitteeDuties(nextEpoch))
if (counts[0].count == 0) and (counts[1].count == 0):
debug "No new sync committee member's duties received", slot = currentSlot
let subscriptions =
block:
var res: seq[RestSyncCommitteeSubscription]
for item in counts:
if item.count > 0:
let subscriptionsInfo = vc.syncMembersSubscriptionInfoForEpoch(item.epoch)
for subInfo in subscriptionsInfo:
let sub = RestSyncCommitteeSubscription(
validator_index: subInfo.validator_index,
sync_committee_indices: subInfo.validator_sync_committee_indices,
until_epoch: (currentEpoch + EPOCHS_PER_SYNC_COMMITTEE_PERIOD -
currentEpoch.since_sync_committee_period_start()).Epoch
)
res.add(sub)
res
if len(subscriptions) > 0:
let res = await vc.prepareSyncCommitteeSubnets(subscriptions)
if not(res):
error "Failed to subscribe validators"
proc pruneBeaconProposers(vc: ValidatorClientRef, epoch: Epoch) =
var proposers: ProposerMap
for epochKey, data in vc.proposers.pairs():
for epochKey, data in vc.proposers:
if (epochKey + HISTORICAL_DUTIES_EPOCHS) >= epoch:
proposers[epochKey] = data
else:
@ -323,6 +461,12 @@ proc validatorIndexLoop(service: DutiesServiceRef) {.async.} =
await vc.pollForValidatorIndices()
await service.waitForNextSlot(IndicesLoop)
proc syncCommitteeeDutiesLoop(service: DutiesServiceRef) {.async.} =
let vc = service.client
while true:
await vc.pollForSyncCommitteeDuties()
await service.waitForNextSlot(SyncCommitteeLoop)
template checkAndRestart(serviceLoop: DutiesServiceLoop,
future: Future[void], body: untyped): untyped =
if future.finished():
@ -345,6 +489,7 @@ proc mainLoop(service: DutiesServiceRef) {.async.} =
fut1 = service.attesterDutiesLoop()
fut2 = service.proposerDutiesLoop()
fut3 = service.validatorIndexLoop()
fut4 = service.syncCommitteeeDutiesLoop()
while true:
var breakLoop = false
@ -354,7 +499,8 @@ proc mainLoop(service: DutiesServiceRef) {.async.} =
if not(fut1.finished()): fut1.cancel()
if not(fut2.finished()): fut2.cancel()
if not(fut3.finished()): fut3.cancel()
await allFutures(fut1, fut2, fut3)
if not(fut4.finished()): fut4.cancel()
await allFutures(fut1, fut2, fut3, fut4)
breakLoop = true
if breakLoop:
@ -363,6 +509,7 @@ proc mainLoop(service: DutiesServiceRef) {.async.} =
checkAndRestart(AttesterLoop, fut1, service.attesterDutiesLoop())
checkAndRestart(ProposerLoop, fut2, service.proposerDutiesLoop())
checkAndRestart(IndicesLoop, fut3, service.validatorIndexLoop())
checkAndRestart(SyncCommitteeLoop, fut4, service.syncCommitteeeDutiesLoop())
except CatchableError as exc:
warn "Service crashed with unexpected error", err_name = exc.name,
err_msg = exc.msg

View File

@ -7,7 +7,7 @@ logScope: service = "fork_service"
proc validateForkSchedule(forks: openArray[Fork]): bool {.raises: [Defect].} =
# Check if `forks` list is linked list.
var current_version = forks[0].current_version
for index, item in forks.pairs():
for index, item in forks:
if index > 0:
if item.previous_version != current_version:
return false

View File

@ -0,0 +1,356 @@
import
std/[sets, sequtils],
chronicles,
"."/[common, api, block_service],
../spec/datatypes/[phase0, altair, bellatrix],
../spec/eth2_apis/rest_types
type
ContributionItem* = object
aggregator_index: uint64
selection_proof: ValidatorSig
validator: AttachedValidator
subcommitteeIdx: SyncSubcommitteeIndex
proc serveSyncCommitteeMessage*(service: SyncCommitteeServiceRef,
slot: Slot,
beaconBlockRoot: Eth2Digest,
duty: SyncDutyAndProof): Future[bool] {.async.} =
let
vc = service.client
fork = vc.forkAtEpoch(slot.epoch)
genesisValidatorsRoot = vc.beaconGenesis.genesis_validators_root
vindex = duty.data.validator_index
subcommitteeIdx = getSubcommitteeIndex(duty.data.validator_sync_committee_index)
validator =
block:
let res = vc.getValidator(duty.data.pubkey)
if res.isNone():
return false
res.get()
message =
block:
let res = await signSyncCommitteeMessage(validator, fork,
genesisValidatorsRoot,
slot, beaconBlockRoot)
if res.isErr():
error "Unable to sign committee message using remote signer",
validator = shortLog(validator), slot = slot,
block_root = shortLog(beaconBlockRoot)
return
res.get()
debug "Sending sync committee message", message = shortLog(message),
validator = shortLog(validator), validator_index = vindex,
delay = vc.getDelay(message.slot.sync_committee_message_deadline())
let res =
try:
await vc.submitPoolSyncCommitteeSignature(message)
except ValidatorApiError:
error "Unable to publish sync committee message",
message = shortLog(message),
validator = shortLog(validator),
validator_index = vindex
return false
except CatchableError as exc:
error "Unexpected error occurred while publishing sync committee message",
message = shortLog(message),
validator = shortLog(validator),
validator_index = vindex,
err_name = exc.name, err_msg = exc.msg
return false
let delay = vc.getDelay(message.slot.sync_committee_message_deadline())
if res:
notice "Sync committee message published",
message = shortLog(message),
validator = shortLog(validator),
validator_index = vindex,
delay = delay
else:
warn "Sync committee message was not accepted by beacon node",
message = shortLog(message),
validator = shortLog(validator),
validator_index = vindex, delay = delay
return res
proc produceAndPublishSyncCommitteeMessages(service: SyncCommitteeServiceRef,
slot: Slot,
beaconBlockRoot: Eth2Digest,
duties: seq[SyncDutyAndProof]) {.async.} =
let vc = service.client
let pendingSyncCommitteeMessages =
block:
var res: seq[Future[bool]]
for duty in duties:
debug "Serving sync message duty", duty = duty.data, epoch = slot.epoch()
res.add(service.serveSyncCommitteeMessage(slot,
beaconBlockRoot,
duty))
res
let statistics =
block:
var errored, succeed, failed = 0
try:
await allFutures(pendingSyncCommitteeMessages)
except CancelledError as exc:
for fut in pendingSyncCommitteeMessages:
if not(fut.finished()):
fut.cancel()
await allFutures(pendingSyncCommitteeMessages)
raise exc
for future in pendingSyncCommitteeMessages:
if future.done():
if future.read():
inc(succeed)
else:
inc(failed)
else:
inc(errored)
(succeed, errored, failed)
let delay = vc.getDelay(slot.attestation_deadline())
debug "Sync committee message statistics", total = len(pendingSyncCommitteeMessages),
succeed = statistics[0], failed_to_deliver = statistics[1],
not_accepted = statistics[2], delay = delay, slot = slot,
duties_count = len(duties)
proc serveContributionAndProof*(service: SyncCommitteeServiceRef,
proof: ContributionAndProof,
validator: AttachedValidator): Future[bool] {.async.} =
let
vc = service.client
slot = proof.contribution.slot
validatorIdx = validator.index.get()
genesisRoot = vc.beaconGenesis.genesis_validators_root
fork = vc.forkAtEpoch(slot.epoch)
signedProof = (ref SignedContributionAndProof)(
message: proof)
let signature =
block:
let res = await validator.sign(signedProof, fork, genesisRoot)
if res.isErr():
error "Unable to sign sync committee contribution using remote signer",
validator = shortLog(validator),
contribution = shortLog(proof.contribution),
error_msg = res.error()
return false
res.get()
debug "Sending sync contribution",
contribution = shortLog(signedProof.message.contribution),
validator = shortLog(validator), validator_index = validatorIdx,
delay = vc.getDelay(slot.sync_contribution_deadline())
let restSignedProof = RestSignedContributionAndProof.init(
signedProof.message, signedProof.signature)
let res =
try:
await vc.publishContributionAndProofs(@[restSignedProof])
except ValidatorApiError as err:
error "Unable to publish sync contribution",
contribution = shortLog(signedProof.message.contribution),
validator = shortLog(validator),
validator_index = validatorIdx,
err_msg = err.msg
false
except CatchableError as err:
error "Unexpected error occurred while publishing sync contribution",
contribution = shortLog(signedProof.message.contribution),
validator = shortLog(validator),
err_name = err.name, err_msg = err.msg
false
if res:
notice "Sync contribution published",
validator = shortLog(validator),
validator_index = validatorIdx
else:
warn "Sync contribution was not accepted by beacon node",
contribution = shortLog(signedProof.message.contribution),
validator = shortLog(validator),
validator_index = validatorIdx
return res
proc produceAndPublishContributions(service: SyncCommitteeServiceRef,
slot: Slot,
beaconBlockRoot: Eth2Digest,
duties: seq[SyncDutyAndProof]) {.async.} =
let
vc = service.client
contributionItems =
block:
var res: seq[ContributionItem]
for duty in duties:
let validator = vc.attachedValidators.getValidator(duty.data.pubkey)
if not isNil(validator):
if duty.slotSig.isSome:
template slotSignature: auto = duty.slotSig.get
if is_sync_committee_aggregator(slotSignature):
res.add(ContributionItem(
aggregator_index: uint64(duty.data.validator_index),
selection_proof: slotSignature,
validator: validator,
subcommitteeIdx: getSubcommitteeIndex(
duty.data.validator_sync_committee_index)
))
res
if len(contributionItems) > 0:
let pendingAggregates =
block:
var res: seq[Future[bool]]
for item in contributionItems:
let aggContribution =
try:
await vc.produceSyncCommitteeContribution(slot,
item.subcommitteeIdx,
beaconBlockRoot)
except ValidatorApiError:
error "Unable to get sync message contribution data", slot = slot,
beaconBlockRoot = shortLog(beaconBlockRoot)
return
except CatchableError as exc:
error "Unexpected error occurred while getting sync message contribution",
slot = slot, beaconBlockRoot = shortLog(beaconBlockRoot),
err_name = exc.name, err_msg = exc.msg
return
let proof = ContributionAndProof(
aggregator_index: item.aggregator_index,
contribution: aggContribution,
selection_proof: item.selection_proof
)
res.add(service.serveContributionAndProof(proof, item.validator))
res
let statistics =
block:
var errored, succeed, failed = 0
try:
await allFutures(pendingAggregates)
except CancelledError as err:
for fut in pendingAggregates:
if not(fut.finished()):
fut.cancel()
await allFutures(pendingAggregates)
raise err
for future in pendingAggregates:
if future.done():
if future.read():
inc(succeed)
else:
inc(failed)
else:
inc(errored)
(succeed, errored, failed)
let delay = vc.getDelay(slot.aggregate_deadline())
debug "Sync message contribution statistics", total = len(pendingAggregates),
succeed = statistics[0], failed_to_deliver = statistics[1],
not_accepted = statistics[2], delay = delay, slot = slot
else:
debug "No contribution and proofs scheduled for slot", slot = slot
proc publishSyncMessagesAndContributions(service: SyncCommitteeServiceRef,
slot: Slot,
duties: seq[SyncDutyAndProof]) {.async.} =
let
vc = service.client
startTime = Moment.now()
try:
let timeout = syncCommitteeMessageSlotOffset
await vc.waitForBlockPublished(slot).wait(nanoseconds(timeout.nanoseconds))
let dur = Moment.now() - startTime
debug "Block proposal awaited", slot = slot, duration = dur
except AsyncTimeoutError:
let dur = Moment.now() - startTime
debug "Block was not produced in time", slot = slot, duration = dur
block:
let delay = vc.getDelay(slot.sync_committee_message_deadline())
debug "Producing sync committee messages", delay = delay, slot = slot,
duties_count = len(duties)
let beaconBlockRoot =
block:
try:
let res = await vc.getHeadBlockRoot()
res.root
except CatchableError as exc:
error "Could not request sync message block root to sign"
return
try:
await service.produceAndPublishSyncCommitteeMessages(slot,
beaconBlockRoot,
duties)
except ValidatorApiError:
error "Unable to proceed sync committee messages", slot = slot,
duties_count = len(duties)
return
except CatchableError as exc:
error "Unexpected error while producing sync committee messages",
slot = slot,
duties_count = len(duties),
err_name = exc.name, err_msg = exc.msg
return
let contributionTime =
# chronos.Duration subtraction cannot return a negative value; in such
# case it will return `ZeroDuration`.
vc.beaconClock.durationToNextSlot() - OneThirdDuration
if contributionTime != ZeroDuration:
await sleepAsync(contributionTime)
block:
let delay = vc.getDelay(slot.sync_contribution_deadline())
debug "Producing contribution and proofs", delay = delay
await service.produceAndPublishContributions(slot, beaconBlockRoot, duties)
proc spawnSyncCommitteeTasks(service: SyncCommitteeServiceRef, slot: Slot) =
let vc = service.client
removeOldSyncPeriodDuties(vc, slot)
let duties = vc.getSyncCommitteeDutiesForSlot(slot + 1)
asyncSpawn service.publishSyncMessagesAndContributions(slot, duties)
proc mainLoop(service: SyncCommitteeServiceRef) {.async.} =
let vc = service.client
service.state = ServiceState.Running
try:
while true:
let sleepTime =
syncCommitteeMessageSlotOffset + vc.beaconClock.durationToNextSlot()
let sres = vc.getCurrentSlot()
if sres.isSome():
let currentSlot = sres.get()
service.spawnSyncCommitteeTasks(currentSlot)
await sleepAsync(sleepTime)
except CatchableError as exc:
warn "Service crashed with unexpected error", err_name = exc.name,
err_msg = exc.msg
proc init*(t: typedesc[SyncCommitteeServiceRef],
vc: ValidatorClientRef): Future[SyncCommitteeServiceRef] {.async.} =
debug "Initializing service"
var res = SyncCommitteeServiceRef(client: vc,
state: ServiceState.Initialized)
return res
proc start*(service: SyncCommitteeServiceRef) =
service.lifeFut = mainLoop(service)

View File

@ -276,7 +276,7 @@ proc sendSyncCommitteeMessages*(node: BeaconNode,
var resCur: Table[uint64, int]
var resNxt: Table[uint64, int]
for index, msg in msgs.pairs():
for index, msg in msgs:
if msg.validator_index < lenu64(state.data.validators):
let msgPeriod = sync_committee_period(msg.slot + 1)
if msgPeriod == curPeriod:
@ -316,7 +316,7 @@ proc sendSyncCommitteeMessages*(node: BeaconNode,
await allFutures(pending)
for index, future in pending.pairs():
for index, future in pending:
if future.done():
let fres = future.read()
if fres.isErr():
@ -898,7 +898,7 @@ proc handleSyncCommitteeContributions(node: BeaconNode,
subcommitteeIdx: subcommitteeIdx)
selectionProofs.add validator.getSyncCommitteeSelectionProof(
fork, genesis_validators_root, slot, subcommitteeIdx.asUInt64)
fork, genesis_validators_root, slot, subcommitteeIdx)
await allFutures(selectionProofs)
@ -908,7 +908,7 @@ proc handleSyncCommitteeContributions(node: BeaconNode,
var contributionsSent = 0
time = timeIt:
for i, proof in selectionProofs.pairs():
for i, proof in selectionProofs:
if not proof.completed:
continue

View File

@ -281,7 +281,7 @@ proc updateEpoch(self: var ValidatorMonitor, epoch: Epoch) =
# and hope things improve
notice "Resetting validator monitoring", epoch, monitorEpoch
for (_, monitor) in self.monitors.mpairs():
for _, monitor in self.monitors:
reset(monitor.summaries)
return

View File

@ -260,11 +260,11 @@ proc signWithRemoteValidator*(v: AttachedValidator, fork: Fork,
proc signWithRemoteValidator*(v: AttachedValidator, fork: Fork,
genesis_validators_root: Eth2Digest,
slot: Slot,
subIndex: uint64): Future[SignatureResult]
subcommittee: SyncSubcommitteeIndex): Future[SignatureResult]
{.async.} =
let request = Web3SignerRequest.init(
fork, genesis_validators_root,
SyncAggregatorSelectionData(slot: slot, subcommittee_index: subIndex),
SyncAggregatorSelectionData(slot: slot, subcommittee_index: uint64 subcommittee)
)
debug "Signing sync aggregator selection data using remote signer",
validator = shortLog(v)
@ -384,7 +384,7 @@ proc getSyncCommitteeSelectionProof*(v: AttachedValidator,
fork: Fork,
genesis_validators_root: Eth2Digest,
slot: Slot,
subcommittee_index: uint64
subcommittee_index: SyncSubcommitteeIndex
): Future[SignatureResult] {.async.} =
return
case v.kind

View File

@ -23,7 +23,7 @@ const
type
StartUpCommand {.pure.} = enum
pubsub, asl, asr, aggasr, lat
pubsub, asl, asr, aggasr, scmsr, csr, lat, traceAll
LogTraceConf = object
logFiles {.
@ -74,8 +74,14 @@ type
discard
of aggasr:
discard
of scmsr:
discard
of csr:
discard
of lat:
discard
of traceAll:
discard
GossipDirection = enum
None, Incoming, Outgoing
@ -140,22 +146,71 @@ type
wallSlot: uint64
signature: string
SyncCommitteeMessageObject = object
slot: uint64
beaconBlockRoot {.serializedFieldName: "beacon_block_root".}: string
validatorIndex {.serializedFieldName: "validator_index".}: uint64
signature: string
ContributionObject = object
slot: uint64
beaconBlockRoot {.serializedFieldName: "beacon_block_root".}: string
subnetId: uint64
aggregationBits {.serializedFieldName: "aggregation_bits".}: string
ContributionMessageObject = object
aggregatorIndex {.serializedFieldName: "aggregator_index".}: uint64
contribution: ContributionObject
selectionProof {.serializedFieldName: "selection_proof".}: string
ContributionSentObject = object
message: ContributionMessageObject
signature: string
SCMSentMessage = object of LogMessage
message: SyncCommitteeMessageObject
validator: string
SCMReceivedMessage = object of LogMessage
wallSlot: uint64
syncCommitteeMsg: SyncCommitteeMessageObject
subcommitteeIdx: uint64
ContributionSentMessage = object of LogMessage
contribution: ContributionSentObject
ContributionReceivedMessage = object of LogMessage
contribution: ContributionObject
wallSlot: uint64
aggregatorIndex {.serializedFieldName: "aggregator_index".}: uint64
signature: string
selectionProof {.serializedFieldName: "selection_proof".}: string
GossipMessage = object
kind: GossipDirection
id: string
datetime: DateTime
processed: bool
SaMessageType {.pure.} = enum
AttestationSent, SlotStart
SMessageType {.pure.} = enum
AttestationSent, SCMSent, SlotStart
SlotAttMessage = object
case kind: SaMessageType
of SaMessageType.AttestationSent:
SlotMessage = object
case kind: SMessageType
of SMessageType.AttestationSent:
asmsg: AttestationSentMessage
of SaMessageType.SlotStart:
of SMessageType.SCMSent:
scmsmsg: SCMSentMessage
of SMessageType.SlotStart:
ssmsg: SlotStartMessage
# SlotMessage = object
# case kind: SMessageType
# of SMessageType.SCMSent:
# scmsmsg: SCMSentMessage
# of SMessageType.SlotStart:
# ssmsg: SlotStartMessage
SRANode = object
directory: NodeDirectory
sends: seq[AttestationSentMessage]
@ -163,6 +218,13 @@ type
aggSends: seq[AggregatedAttestationSentMessage]
aggRecvs: TableRef[string, AggregatedAttestationReceivedMessage]
SRSCNode = object
directory: NodeDirectory
sends: seq[SCMSentMessage]
recvs: TableRef[string, SCMReceivedMessage]
contributionSends: seq[ContributionSentMessage]
contributionRecvs: TableRef[string, ContributionReceivedMessage]
proc readValue(reader: var JsonReader, value: var DateTime) =
let s = reader.readValue(string)
try:
@ -198,8 +260,8 @@ proc readLogFile(file: string): seq[JsonNode] =
proc readLogFileForAttsMessages(file: string,
ignoreErrors = true,
dumpErrors = false): seq[SlotAttMessage] =
var res = newSeq[SlotAttMessage]()
dumpErrors = false): seq[SlotMessage] =
var res = newSeq[SlotMessage]()
var stream = newFileStream(file)
var line: string
var counter = 0
@ -225,13 +287,13 @@ proc readLogFileForAttsMessages(file: string,
if m.msg == "Attestation sent":
let am = Json.decode(line, AttestationSentMessage,
allowUnknownFields = true)
let m = SlotAttMessage(kind: SaMessageType.AttestationSent,
let m = SlotMessage(kind: SMessageType.AttestationSent,
asmsg: am)
res.add(m)
elif m.msg == "Slot start":
let sm = Json.decode(line, SlotStartMessage,
allowUnknownFields = true)
let m = SlotAttMessage(kind: SaMessageType.SlotStart,
let m = SlotMessage(kind: SMessageType.SlotStart,
ssmsg: sm)
res.add(m)
@ -298,6 +360,110 @@ proc readLogFileForASRMessages(file: string, srnode: var SRANode,
finally:
stream.close()
proc readLogFileForSCMSendMessages(file: string,
ignoreErrors = true,
dumpErrors = false): seq[SlotMessage] =
var res = newSeq[SlotMessage]()
var stream = newFileStream(file)
var line: string
var counter = 0
try:
while not(stream.atEnd()):
line = stream.readLine()
inc(counter)
var m: LogMessage
try:
m = Json.decode(line, LogMessage, allowUnknownFields = true)
except SerializationError as exc:
if dumpErrors:
error "Serialization error while reading file, ignoring", file = file,
line_number = counter, errorMsg = exc.formatMsg(line)
else:
error "Serialization error while reading file, ignoring", file = file,
line_number = counter
if not(ignoreErrors):
raise exc
else:
continue
if m.msg == "Sync committee message sent":
let scmm = Json.decode(line, SCMSentMessage,
allowUnknownFields = true)
let m = SlotMessage(kind: SMessageType.SCMSent,
scmsmsg: scmm)
res.add(m)
elif m.msg == "Slot start":
let sm = Json.decode(line, SlotStartMessage,
allowUnknownFields = true)
let m = SlotMessage(kind: SMessageType.SlotStart,
ssmsg: sm)
res.add(m)
if counter mod 10_000 == 0:
info "Processing file", file = extractFilename(file),
lines_processed = counter,
lines_filtered = len(res)
result = res
except CatchableError as exc:
warn "Error reading data from file", file = file, errorMsg = exc.msg
finally:
stream.close()
proc readLogFileForSCMSRMessages(file: string, srnode: var SRSCNode,
ignoreErrors = true, dumpErrors = false) =
var stream = newFileStream(file)
var line: string
var counter = 0
try:
while not(stream.atEnd()):
var m: LogMessage
line = stream.readLine()
inc(counter)
try:
m = Json.decode(line, LogMessage, allowUnknownFields = true)
except SerializationError as exc:
if dumpErrors:
error "Serialization error while reading file, ignoring", file = file,
line_number = counter, errorMsg = exc.formatMsg(line)
else:
error "Serialization error while reading file, ignoring", file = file,
line_number = counter
if not(ignoreErrors):
raise exc
else:
continue
if m.msg == "Sync committee message sent":
let sm = Json.decode(line, SCMSentMessage,
allowUnknownFields = true)
srnode.sends.add(sm)
elif m.msg == "Sync committee message received":
let rm = Json.decode(line, SCMReceivedMessage,
allowUnknownFields = true)
discard srnode.recvs.hasKeyOrPut(rm.syncCommitteeMsg.signature, rm)
elif m.msg == "Contribution received":
let rm = Json.decode(line, ContributionReceivedMessage,
allowUnknownFields = true)
discard srnode.contributionRecvs.hasKeyOrPut(rm.signature, rm)
elif m.msg == "Contribution sent":
let sm = Json.decode(line, ContributionSentMessage,
allowUnknownFields = true)
srnode.contributionSends.add(sm)
if counter mod 10_000 == 0:
info "Processing file", file = extractFilename(file),
lines_processed = counter,
sends_filtered = len(srnode.sends),
recvs_filtered = len(srnode.recvs)
except CatchableError as exc:
warn "Error reading data from file", file = file, errorMsg = exc.msg
finally:
stream.close()
proc readLogFileForSecondMessages(file: string, ignoreErrors = true,
dumpErrors = false): seq[LogMessage] =
var stream = newFileStream(file)
@ -480,10 +646,10 @@ proc runAttSend(logConf: LogTraceConf, logFiles: seq[string]) =
var currentSlot: Option[SlotStartMessage]
for item in data:
if item.kind == SaMessageType.SlotStart:
if item.kind == SMessageType.SlotStart:
currentSlot = some(item.ssmsg)
inc(slotMessagesCount)
elif item.kind == SaMessageType.AttestationSent:
elif item.kind == SMessageType.AttestationSent:
if currentSlot.isSome():
let attestationTime = currentSlot.get().timestamp -
item.asmsg.timestamp
@ -606,6 +772,109 @@ proc runAggAttSendReceive(logConf: LogTraceConf, nodes: seq[NodeDirectory]) =
successful_broadcasts = success, failed_broadcasts = failed,
total_broadcasts = len(srnodes[i].aggSends)
proc runSCMSendReceive(logConf: LogTraceConf, nodes: seq[NodeDirectory]) =
info "Check for Sync Committee Message sent/received messages"
if len(nodes) < 2:
error "Number of nodes' log files insufficient", nodes_count = len(nodes)
quit(1)
var srnodes = newSeq[SRSCNode]()
for node in nodes:
var srnode = SRSCNode(
directory: node,
sends: newSeq[SCMSentMessage](),
recvs: newTable[string, SCMReceivedMessage](),
contributionSends: newSeq[ContributionSentMessage](),
contributionRecvs: newTable[string, ContributionReceivedMessage]()
)
info "Processing node", node = node.name
for logfile in node.logs:
let path = node.path & DirSep & logfile
info "Processing node's logfile", node = node.name, logfile = path
readLogFileForSCMSRMessages(path, srnode,
logConf.ignoreSerializationErrors,
logConf.dumpSerializationErrors)
srnodes.add(srnode)
if len(nodes) < 2:
error "Number of nodes' log files insufficient", nodes_count = len(nodes)
quit(1)
for i in 0 ..< len(srnodes):
var success = 0
var failed = 0
for item in srnodes[i].sends:
var k = (i + 1) mod len(srnodes)
var misses = newSeq[string]()
while k != i:
if item.message.signature notin srnodes[k].recvs:
misses.add(srnodes[k].directory.name)
k = (k + 1) mod len(srnodes)
if len(misses) == 0:
inc(success)
else:
inc(failed)
info "Sync committee message was not received", sender = srnodes[i].directory.name,
signature = item.message.signature,
receivers = misses.toSimple(), send_stamp = item.timestamp
info "Statistics for sender node", sender = srnodes[i].directory.name,
successful_broadcasts = success, failed_broadcasts = failed,
total_broadcasts = len(srnodes[i].sends)
proc runContributionSendReceive(logConf: LogTraceConf, nodes: seq[NodeDirectory]) =
info "Check for contribution sent/received messages"
if len(nodes) < 2:
error "Number of nodes' log files insufficient", nodes_count = len(nodes)
quit(1)
var srnodes = newSeq[SRSCNode]()
for node in nodes:
var srnode = SRSCNode(
directory: node,
sends: newSeq[SCMSentMessage](),
recvs: newTable[string, SCMReceivedMessage](),
contributionSends: newSeq[ContributionSentMessage](),
contributionRecvs: newTable[string, ContributionReceivedMessage]()
)
info "Processing node", node = node.name
for logfile in node.logs:
let path = node.path & DirSep & logfile
info "Processing node's logfile", node = node.name, logfile = path
readLogFileForSCMSRMessages(path, srnode,
logConf.ignoreSerializationErrors,
logConf.dumpSerializationErrors)
srnodes.add(srnode)
if len(nodes) < 2:
error "Number of nodes' log files insufficient", nodes_count = len(nodes)
quit(1)
for i in 0 ..< len(srnodes):
var success = 0
var failed = 0
for item in srnodes[i].contributionSends:
var k = (i + 1) mod len(srnodes)
var misses = newSeq[string]()
while k != i:
if item.contribution.signature notin srnodes[k].contributionRecvs:
misses.add(srnodes[k].directory.name)
k = (k + 1) mod len(srnodes)
if len(misses) == 0:
inc(success)
else:
inc(failed)
info "Contribution was not received",
sender = srnodes[i].directory.name,
signature = item.contribution.signature,
receivers = misses.toSimple(), send_stamp = item.timestamp
info "Statistics for sender node", sender = srnodes[i].directory.name,
successful_broadcasts = success, failed_broadcasts = failed,
total_broadcasts = len(srnodes[i].contributionSends)
proc runLatencyCheck(logConf: LogTraceConf, logFiles: seq[string],
nodes: seq[NodeDirectory]) =
info "Check for async responsiveness"
@ -686,8 +955,20 @@ proc run(conf: LogTraceConf) =
runAttSendReceive(conf, logNodes)
of StartUpCommand.aggasr:
runAggAttSendReceive(conf, logNodes)
of StartUpCommand.scmsr:
runSCMSendReceive(conf, logNodes)
of StartUpCommand.csr:
runContributionSendReceive(conf, logNodes)
of StartUpCommand.lat:
runLatencyCheck(conf, logFiles, logNodes)
of StartUpCommand.traceAll:
runContributionSendReceive(conf, logNodes)
runSCMSendReceive(conf, logNodes)
runAggAttSendReceive(conf, logNodes)
runAttSendReceive(conf, logNodes)
runLatencyCheck(conf, logFiles, logNodes)
runPubsub(conf, logFiles)
runAttSend(conf, logFiles)
when isMainModule:
echo LogTraceHeader

View File

@ -165,7 +165,7 @@ proc collectEpochRewardsAndPenalties*(
total_balance = info.balances.current_epoch
total_balance_sqrt = integer_squareroot(total_balance)
for index, validator in info.validators.pairs:
for index, validator in info.validators:
if not is_eligible_validator(validator):
continue

View File

@ -646,7 +646,7 @@ proc cmdValidatorPerf(conf: DbConf, cfg: RuntimeConfig) =
case info.kind
of EpochInfoFork.Phase0:
template info: untyped = info.phase0Data
for i, s in info.validators.pairs():
for i, s in info.validators:
let perf = addr perfs[i]
if RewardFlags.isActiveInPreviousEpoch in s.flags:
if s.is_previous_epoch_attester.isSome():
@ -713,7 +713,7 @@ proc cmdValidatorPerf(conf: DbConf, cfg: RuntimeConfig) =
echo "validator_index,attestation_hits,attestation_misses,head_attestation_hits,head_attestation_misses,target_attestation_hits,target_attestation_misses,delay_avg,first_slot_head_attester_when_first_slot_empty,first_slot_head_attester_when_first_slot_not_empty"
for (i, perf) in perfs.pairs:
for i, perf in perfs:
var
count = 0'u64
sum = 0'u64
@ -929,7 +929,7 @@ proc cmdValidatorDb(conf: DbConf, cfg: RuntimeConfig) =
doAssert state.data.balances.len == previousEpochBalances.len
doAssert state.data.balances.len == rewardsAndPenalties.len
for index, validator in info.validators.pairs:
for index, validator in info.validators:
template rp: untyped = rewardsAndPenalties[index]
checkBalance(index, validator, state.data.balances[index].int64,

View File

@ -419,7 +419,7 @@ proc prepareRequest(uri: Uri,
var res: seq[tuple[key: string, value: string]]
if jheaders.kind != JObject:
return err("Field `headers` should be an object")
for key, value in jheaders.fields.pairs():
for key, value in jheaders.fields:
if value.kind != JString:
return err("Field `headers` element should be only strings")
res.add((key, value.str))
@ -770,7 +770,7 @@ proc structCmp(j1, j2: JsonNode, strict: bool): bool =
if strict:
if len(j1.fields) != len(j2.fields):
return false
for key, value in j1.fields.pairs():
for key, value in j1.fields:
let j2node = j2.getOrDefault(key)
if isNil(j2node):
return false
@ -778,7 +778,7 @@ proc structCmp(j1, j2: JsonNode, strict: bool): bool =
return false
true
else:
for key, value in j2.fields.pairs():
for key, value in j2.fields:
let j1node = j1.getOrDefault(key)
if isNil(j1node):
return false
@ -1017,7 +1017,7 @@ proc startTests(conf: RestTesterConf, uri: Uri,
return 1
res.get()
for index, item in rules.pairs():
for index, item in rules:
inputQueue.addLastNoWait(TestCase(index: index, rule: item))
for i in 0 ..< len(workers):
@ -1067,7 +1067,7 @@ proc startTests(conf: RestTesterConf, uri: Uri,
alignLeft("MESSAGE", 20) & "\r\n" &
'-'.repeat(45 + 20 + 7 + 20 + 20)
echo headerLine
for index, item in rules.pairs():
for index, item in rules:
let errorFlag =
block:
var tmp = "---"

View File

@ -194,7 +194,7 @@ cli do(slots = SLOTS_PER_EPOCH * 6,
let
selectionProofSig = get_sync_committee_selection_proof(
fork, genesis_validators_root, slot, uint64 subcommitteeIdx,
fork, genesis_validators_root, slot, subcommitteeIdx,
validatorPrivKey).toValidatorSig
if is_sync_committee_aggregator(selectionProofSig):

View File

@ -244,7 +244,7 @@ cli do(validatorsDir: string, secretsDir: string,
agg: AggregateSignature
inited = false
for i, pubkey in pubkeys.pairs():
for i, pubkey in pubkeys:
validatorKeys.withValue(pubkey, privkey):
let sig = get_sync_committee_message_signature(
fork, genesis_validators_root, slot, blockRoot, privkey[])

View File

@ -91,7 +91,7 @@ CI run: $(basename "$0") --disable-htop -- --verify-finalization
and validator clients, with all beacon nodes being paired up
with a corresponding validator client)
--lighthouse-vc-nodes number of Lighthouse VC nodes (assigned before Nimbus VC nodes, default: ${LIGHTHOUSE_VC_NODES})
--enable-logtrace display logtrace "aggasr" analysis
--enable-logtrace display logtrace analysis
--log-level set the log level (default: "${LOG_LEVEL}")
--reuse-existing-data-dir instead of deleting and recreating the data dir, keep it and reuse everything we can from it
--reuse-binaries don't (re)build the binaries we need and don't delete them at the end (speeds up testing)

View File

@ -242,7 +242,7 @@ suite "Message signatures":
test "Sync committee selection proof signatures":
let
slot = default(Slot)
subcommittee_index = default(uint64)
subcommittee_index = default(SyncSubcommitteeIndex)
check:
# Matching public/private keys and genesis validator roots

View File

@ -304,6 +304,7 @@ proc makeSyncAggregate(
cfg: RuntimeConfig): SyncAggregate =
if syncCommitteeRatio <= 0.0:
return SyncAggregate.init()
let
syncCommittee =
withState(state):
@ -323,11 +324,13 @@ proc makeSyncAggregate(
latest_block_root =
withState(state): state.latest_block_root
syncCommitteePool = newClone(SyncCommitteeMsgPool.init(keys.newRng()))
type
Aggregator = object
subcommitteeIdx: SyncSubcommitteeIndex
validatorIdx: ValidatorIndex
selectionProof: ValidatorSig
var aggregators: seq[Aggregator]
for subcommitteeIdx in SyncSubcommitteeIndex:
let
@ -357,8 +360,9 @@ proc makeSyncAggregate(
MockPrivKeys[validatorIdx])
selectionProofSig = get_sync_committee_selection_proof(
fork, genesis_validators_root,
slot, subcommitteeIdx.uint64,
slot, subcommitteeIdx,
MockPrivKeys[validatorIdx])
syncCommitteePool[].addSyncCommitteeMessage(
slot,
latest_block_root,
@ -366,11 +370,13 @@ proc makeSyncAggregate(
signature,
subcommitteeIdx,
positions)
if is_sync_committee_aggregator(selectionProofSig.toValidatorSig):
aggregators.add Aggregator(
subcommitteeIdx: subcommitteeIdx,
validatorIdx: validatorIdx,
selectionProof: selectionProofSig.toValidatorSig)
for aggregator in aggregators:
var contribution: SyncCommitteeContribution
if syncCommitteePool[].produceContribution(
@ -389,6 +395,7 @@ proc makeSyncAggregate(
signature: contributionSig.toValidatorSig)
syncCommitteePool[].addContribution(
signedContributionAndProof, contribution.signature.load.get)
syncCommitteePool[].produceSyncAggregate(latest_block_root)
iterator makeTestBlocks*(