Make new sync committee code compilable.

This commit is contained in:
cheatfate 2023-05-09 22:14:21 +03:00
parent 9fac6b3809
commit 77657974bb
No known key found for this signature in database
GPG Key ID: 46ADD633A7201F95
5 changed files with 564 additions and 308 deletions

View File

@ -20,10 +20,6 @@ export
options, eth2_rest_serialization, blockchain_dag, presto, rest_types,
rest_constants
type
ValidatorIndexError* {.pure.} = enum
UnsupportedValue, TooHighValue
func match(data: openArray[char], charset: set[char]): int =
for ch in data:
if ch notin charset:
@ -203,26 +199,6 @@ template strData*(body: ContentBody): string =
bind fromBytes
string.fromBytes(body.data)
func toValidatorIndex*(value: RestValidatorIndex): Result[ValidatorIndex,
ValidatorIndexError] =
when sizeof(ValidatorIndex) == 4:
if uint64(value) < VALIDATOR_REGISTRY_LIMIT:
# On x86 platform Nim allows only `int32` indexes, so all the indexes in
# range `2^31 <= x < 2^32` are not supported.
if uint64(value) <= uint64(high(int32)):
ok(ValidatorIndex(value))
else:
err(ValidatorIndexError.UnsupportedValue)
else:
err(ValidatorIndexError.TooHighValue)
elif sizeof(ValidatorIndex) == 8:
if uint64(value) < VALIDATOR_REGISTRY_LIMIT:
ok(ValidatorIndex(value))
else:
err(ValidatorIndexError.TooHighValue)
else:
doAssert(false, "ValidatorIndex type size is incorrect")
func syncCommitteeParticipants*(forkedState: ForkedHashedBeaconState,
epoch: Epoch
): Result[seq[ValidatorPubKey], cstring] =

View File

@ -68,6 +68,9 @@ type
ValidatorQueryKind* {.pure.} = enum
Index, Key
ValidatorIndexError* {.pure.} = enum
UnsupportedValue, TooHighValue
ValidatorIdent* = object
case kind*: ValidatorQueryKind
of ValidatorQueryKind.Index:
@ -1009,3 +1012,23 @@ func init*(t: typedesc[RestSignedContributionAndProof],
message.contribution
),
signature: signature)
func toValidatorIndex*(value: RestValidatorIndex): Result[ValidatorIndex,
ValidatorIndexError] =
when sizeof(ValidatorIndex) == 4:
if uint64(value) < VALIDATOR_REGISTRY_LIMIT:
# On x86 platform Nim allows only `int32` indexes, so all the indexes in
# range `2^31 <= x < 2^32` are not supported.
if uint64(value) <= uint64(high(int32)):
ok(ValidatorIndex(value))
else:
err(ValidatorIndexError.UnsupportedValue)
else:
err(ValidatorIndexError.TooHighValue)
elif sizeof(ValidatorIndex) == 8:
if uint64(value) < VALIDATOR_REGISTRY_LIMIT:
ok(ValidatorIndex(value))
else:
err(ValidatorIndexError.TooHighValue)
else:
doAssert(false, "ValidatorIndex type size is incorrect")

View File

@ -6,7 +6,7 @@
# at your option. This file may not be copied, modified, or distributed except according to those terms.
import
std/[tables, os, sets, sequtils, strutils, uri],
std/[tables, os, sets, sequtils, strutils, uri, algorithm],
stew/[base10, results, byteutils],
bearssl/rand, chronos, presto, presto/client as presto_client,
chronicles, confutils,
@ -60,6 +60,8 @@ type
client*: ValidatorClientRef
DutiesServiceRef* = ref object of ClientServiceRef
pollingAttesterDutiesTask*: Future[void]
pollingSyncDutiesTask*: Future[void]
FallbackServiceRef* = ref object of ClientServiceRef
changesEvent*: AsyncEvent
@ -81,10 +83,14 @@ type
data*: RestAttesterDuty
slotSig*: Opt[ValidatorSig]
SyncCommitteeDuty* = object
pubkey*: ValidatorPubKey
validator_index*: ValidatorIndex
validator_sync_committee_index*: IndexInSyncCommittee
SyncCommitteeDuty* = RestSyncCommitteeDuty
SlotProofsArray* = array[SLOTS_PER_EPOCH, Opt[ValidatorSig]]
SyncDutyAndProof* = object
epoch*: Epoch
data*: SyncCommitteeDuty
slotSigs*: Table[SyncSubcommitteeIndex, SlotProofsArray]
SyncCommitteeSubscriptionInfo* = object
validator_index*: ValidatorIndex
@ -122,7 +128,7 @@ type
duties*: Table[Epoch, DutyAndProof]
EpochSyncDuties* = object
duties*: Table[Epoch, SyncCommitteeDuty]
duties*: Table[Epoch, SyncDutyAndProof]
RestBeaconNodeStatus* {.pure.} = enum
Offline, ## BN is offline.
@ -211,6 +217,7 @@ type
const
DefaultDutyAndProof* = DutyAndProof(epoch: FAR_FUTURE_EPOCH)
DefaultSyncDutyAndProof* = SyncDutyAndProof(epoch: FAR_FUTURE_EPOCH)
SlotDuration* = int64(SECONDS_PER_SLOT).seconds
OneThirdDuration* = int64(SECONDS_PER_SLOT).seconds div INTERVALS_PER_SLOT
AllBeaconNodeRoles* = {
@ -394,7 +401,7 @@ chronicles.expandIt(RestAttesterDuty):
chronicles.expandIt(SyncCommitteeDuty):
pubkey = shortLog(it.pubkey)
validator_index = it.validator_index
validator_sync_committee_index = it.validator_sync_committee_index
validator_sync_committee_indices = it.validator_sync_committee_indices
proc checkConfig*(info: RestSpecVC): bool =
# /!\ Keep in sync with `spec/eth2_apis/rest_types.nim` > `RestSpecVC`.
@ -500,11 +507,14 @@ proc stop*(csr: ClientServiceRef) {.async.} =
csr.state = ServiceState.Closed
debug "Service stopped", service = csr.name
proc isDefault*(dap: DutyAndProof): bool =
dap.epoch == Epoch(0xFFFF_FFFF_FFFF_FFFF'u64)
func isDefault*(dap: DutyAndProof): bool =
dap.epoch == FAR_FUTURE_EPOCH
proc isDefault*(prd: ProposedData): bool =
prd.epoch == Epoch(0xFFFF_FFFF_FFFF_FFFF'u64)
func isDefault*(prd: ProposedData): bool =
prd.epoch == FAR_FUTURE_EPOCH
func isDefault*(sdap: SyncDutyAndProof): bool =
sdap.epoch == FAR_FUTURE_EPOCH
proc parseRoles*(data: string): Result[set[BeaconNodeRole], cstring] =
var res: set[BeaconNodeRole]
@ -622,6 +632,10 @@ proc init*(t: typedesc[DutyAndProof], epoch: Epoch, dependentRoot: Eth2Digest,
DutyAndProof(epoch: epoch, dependentRoot: dependentRoot, data: duty,
slotSig: slotSig)
proc init*(t: typedesc[SyncDutyAndProof], epoch: Epoch,
duty: SyncCommitteeDuty): SyncDutyAndProof =
SyncDutyAndProof(epoch: epoch, data: duty)
proc init*(t: typedesc[ProposedData], epoch: Epoch, dependentRoot: Eth2Digest,
data: openArray[ProposerTask]): ProposedData =
ProposedData(epoch: epoch, dependentRoot: dependentRoot, duties: @data)
@ -638,17 +652,16 @@ proc getAttesterDutiesForSlot*(vc: ValidatorClientRef,
## Returns all `DutyAndProof` for the given `slot`.
var res: seq[DutyAndProof]
let epoch = slot.epoch()
for key, item in vc.attesters:
let duty = item.duties.getOrDefault(epoch, DefaultDutyAndProof)
if not(duty.isDefault()):
if duty.data.slot == slot:
res.add(duty)
for key, item in mpairs(vc.attesters):
item.duties.withValue(epoch, duty):
if duty[].data.slot == slot:
res.add(duty[])
res
proc getSyncCommitteeDutiesForSlot*(vc: ValidatorClientRef,
slot: Slot): seq[SyncCommitteeDuty] =
## Returns all `SyncCommitteeDuty` for the given `slot`.
var res: seq[SyncCommitteeDuty]
slot: Slot): seq[SyncDutyAndProof] =
## Returns all `SyncDutyAndProof` for the given `slot`.
var res: seq[SyncDutyAndProof]
let epoch = slot.epoch()
for key, item in mpairs(vc.syncCommitteeDuties):
item.duties.withValue(epoch, duty):
@ -698,24 +711,26 @@ iterator attesterDutiesForEpoch*(vc: ValidatorClientRef,
if not(isDefault(epochDuties)):
yield epochDuties
iterator syncDutiesForEpoch*(vc: ValidatorClientRef,
epoch: Epoch): SyncDutyAndProof =
for key, item in vc.syncCommitteeDuties:
let epochDuties = item.duties.getOrDefault(epoch)
if not(isDefault(epochDuties)):
yield epochDuties
proc syncMembersSubscriptionInfoForEpoch*(
vc: ValidatorClientRef,
epoch: Epoch): seq[SyncCommitteeSubscriptionInfo] =
epoch: Epoch
): seq[SyncCommitteeSubscriptionInfo] =
var res: seq[SyncCommitteeSubscriptionInfo]
for key, item in mpairs(vc.syncCommitteeDuties):
var cur: SyncCommitteeSubscriptionInfo
var initialized = false
item.duties.withValue(epoch, epochDuties):
if not initialized:
cur.validator_index = epochDuties.validator_index
initialized = true
cur.validator_sync_committee_indices.add(
epochDuties.validator_sync_committee_index)
if initialized:
res.add cur
for mitem in mvalues(vc.syncCommitteeDuties):
mitem.duties.withValue(epoch, epochDuties):
res.add(
SyncCommitteeSubscriptionInfo(
validator_index:
epochDuties[].data.validator_index,
validator_sync_committee_indices:
epochDuties[].data.validator_sync_committee_indices))
res
proc getDelay*(vc: ValidatorClientRef, deadline: BeaconTime): TimeDiff =
@ -851,19 +866,6 @@ proc isExpired*(vc: ValidatorClientRef,
else:
true
func getSelections*(
daps: openArray[DutyAndProof]
): seq[RestBeaconCommitteeSelection] =
var res: seq[RestBeaconCommitteeSelection]
for item in daps:
if item.slotSig.isSome():
res.add(RestBeaconCommitteeSelection(
validator_index: RestValidatorIndex(item.data.validator_index),
slot: item.data.slot,
selection_proof: item.slotSig.get()
))
res
proc getValidatorRegistration(
vc: ValidatorClientRef,
validator: AttachedValidator,
@ -1097,3 +1099,29 @@ proc checkedWaitForNextSlot*(vc: ValidatorClientRef, curSlot: Opt[Slot],
nextSlot = currentSlot + 1
vc.checkedWaitForSlot(nextSlot, offset, showLogs)
func cmpUnsorted*[T](a, b: openArray[T]): bool =
if len(a) != len(b): return false
return
case len(a)
of 0:
true
of 1:
a[0] == b[0]
of 2:
((a[0] == b[0]) and (a[1] == b[1])) or
((a[0] == b[1]) and (a[1] == b[0]))
else:
let asorted = sorted(a)
let bsorted = sorted(b)
for index, item in asorted.pairs():
if item != bsorted[index]:
return false
true
func `==`*(a, b: SyncDutyAndProof): bool =
(a.epoch == b.epoch) and
(a.data.pubkey == b.data.pubkey) and
(a.data.validator_index == b.data.validator_index) and
cmpUnsorted(a.data.validator_sync_committee_indices,
b.data.validator_sync_committee_indices)

View File

@ -5,7 +5,7 @@
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.
import std/[sets, sequtils]
import std/[sets, sequtils, algorithm]
import chronicles
import common, api, block_service
@ -20,6 +20,22 @@ type
AttesterLoop, ProposerLoop, IndicesLoop, SyncCommitteeLoop,
ProposerPreparationLoop, ValidatorRegisterLoop
SyncIndexTable = Table[SyncSubcommitteeIndex, SlotProofsArray]
SyncValidatorAndDuty = object
validator: AttachedValidator
duty: SyncDutyAndProof
AttestationSlotRequest = object
validator: AttachedValidator
slot: Slot
SyncSlotRequest = object
validator: AttachedValidator
slot: Slot
validatorSyncCommitteeIndex: IndexInSyncCommittee
validatorSubCommitteeIndex: SyncSubcommitteeIndex
chronicles.formatIt(DutiesServiceLoop):
case it
of AttesterLoop: "attester_loop"
@ -29,6 +45,9 @@ chronicles.formatIt(DutiesServiceLoop):
of ProposerPreparationLoop: "proposer_prepare_loop"
of ValidatorRegisterLoop: "validator_register_loop"
proc cmp(x, y: AttestationSlotRequest): int =
if x.slot == y.slot: 0 elif x.slot < y.slot: -1 else: 1
proc checkDuty(duty: RestAttesterDuty): bool =
(duty.committee_length <= MAX_VALIDATORS_PER_COMMITTEE) and
(uint64(duty.committee_index) < MAX_COMMITTEES_PER_SLOT) and
@ -109,6 +128,147 @@ proc pollForValidatorIndices*(service: DutiesServiceRef) {.async.} =
updated_validators = updated
vc.indicesAvailable.fire()
proc fillAttestationSlotSignatures*(
service: DutiesServiceRef,
epochPeriods: seq[Epoch]
) {.async.} =
let
vc = service.client
genesisRoot = vc.beaconGenesis.genesis_validators_root
requests =
block:
var res: seq[AttestationSlotRequest]
for epoch in epochPeriods:
for duty in vc.attesterDutiesForEpoch(epoch):
if duty.slotSig.isNone():
let validator =
vc.attachedValidators[].
getValidator(duty.data.pubkey).valueOr:
continue
res.add(AttestationSlotRequest(
validator: validator,
slot: duty.data.slot
))
# We make requests sorted by slot number.
sorted(res, cmp, order = SortOrder.Ascending)
pendingRequests =
block:
var res: seq[Future[SignatureResult]]
for request in requests:
let fork = vc.forkAtEpoch(request.slot.epoch())
res.add(request.validator.getSlotSignature(fork, genesisRoot,
request.slot))
res
try:
# TODO (cheatfate): Here we waiting for all signatures, but when remote
# signer is used we could try different approach.
await allFutures(pendingRequests)
except CancelledError as exc:
var pending: seq[Future[void]]
for future in pendingRequests:
if not(future.finished()): pending.add(future.cancelAndWait())
await allFutures(pending)
raise exc
for index, fut in pendingRequests.pairs():
let
request = requests[index]
signature =
if fut.done():
let sres = fut.read()
if sres.isErr():
warn "Unable to create slot signature using remote signer",
reason = sres.error(), epoch = request.slot.epoch(),
slot = request.slot
Opt.none(ValidatorSig)
else:
Opt.some(sres.get())
else:
Opt.none(ValidatorSig)
vc.attesters.withValue(request.validator.pubkey, map):
map[].duties.withValue(request.slot.epoch(), dap):
dap[].slotSig = signature
if vc.config.distributedEnabled:
var indexToKey: Table[ValidatorIndex, Opt[ValidatorPubKey]]
let selections =
block:
var sres: seq[RestBeaconCommitteeSelection]
for epoch in epochPeriods:
for duty in vc.attesterDutiesForEpoch(epoch):
# We only use duties which has slot signature filled, because
# middleware needs it to create aggregated signature.
if duty.slotSig.isSome():
let
validator = vc.attachedValidators[].getValidator(
duty.data.pubkey).valueOr:
continue
vindex = validator.index.valueOr:
continue
indexToKey[vindex] = Opt.some(validator.pubkey)
sres.add(RestBeaconCommitteeSelection(
validator_index: RestValidatorIndex(vindex),
slot: duty.data.slot,
selection_proof: duty.slotSig.get()
))
sres
if len(selections) == 0: return
let sresponse =
try:
# Query middleware for aggregated signatures.
await vc.submitBeaconCommitteeSelections(selections,
ApiStrategyKind.Best)
except ValidatorApiError as exc:
warn "Unable to submit beacon committee selections",
reason = exc.getFailureReason()
return
except CancelledError as exc:
debug "Beacon committee selections processing was interrupted"
raise exc
except CatchableError as exc:
error "Unexpected error occured while trying to submit beacon " &
"committee selections", reason = exc.msg, error = exc.name
return
for selection in sresponse.data:
let
vindex = selection.validator_index.toValidatorIndex().valueOr:
warn "Invalid validator_index value encountered while processing " &
"beacon committee selections",
validator_index = uint64(selection.validator_index),
reason = $error
continue
selectionProof = selection.selection_proof.load().valueOr:
warn "Invalid signature encountered while processing " &
"beacon committee selections",
validator_index = vindex,
slot = selection.slot,
selection_proof = shortLog(selection.selection_proof)
continue
validator =
block:
# Selections operating using validator indices, so we should check
# if we have such validator index in our validator's pool and it
# still in place (not removed using keystore manager).
let key = indexToKey.getOrDefault(vindex)
if key.isNone():
warn "Non-existing validator encountered while processing " &
"beacon committee selections",
validator_index = vindex,
slot = selection.slot,
selection_proof = shortLog(selection.selection_proof)
continue
vc.attachedValidators[].getValidator(key.get()).valueOr:
continue
vc.attesters.withValue(validator.pubkey, map):
map[].duties.withValue(selection.slot.epoch(), dap):
dap[].slotSig = Opt.some(selectionProof.toValidatorSig())
proc pollForAttesterDuties*(service: DutiesServiceRef,
epoch: Epoch): Future[int] {.async.} =
let vc = service.client
@ -118,7 +278,7 @@ proc pollForAttesterDuties*(service: DutiesServiceRef,
return 0
var duties: seq[RestAttesterDuty]
var currentRoot: Option[Eth2Digest]
var currentRoot: Opt[Eth2Digest]
var offset = 0
while offset < len(validatorIndices):
@ -151,7 +311,7 @@ proc pollForAttesterDuties*(service: DutiesServiceRef,
if currentRoot.isNone():
# First request
currentRoot = some(res.dependent_root)
currentRoot = Opt.some(res.dependent_root)
else:
if currentRoot.get() != res.dependent_root:
# `dependent_root` must be equal for all requests/response, if it got
@ -159,7 +319,7 @@ proc pollForAttesterDuties*(service: DutiesServiceRef,
# should re-request all queries again.
offset = 0
duties.setLen(0)
currentRoot = none[Eth2Digest]()
currentRoot = Opt.none(Eth2Digest)
continue
for item in res.data:
@ -194,108 +354,11 @@ proc pollForAttesterDuties*(service: DutiesServiceRef,
res.add((epoch, duty))
res
if len(addOrReplaceItems) > 0:
var pendingRequests: seq[Future[SignatureResult]]
var validators: seq[AttachedValidator]
for item in addOrReplaceItems:
let
validator =
vc.attachedValidators[].getValidator(item.duty.pubkey).valueOr:
continue
fork = vc.forkAtEpoch(item.duty.slot.epoch)
future = validator.getSlotSignature(fork, genesisRoot, item.duty.slot)
pendingRequests.add(future)
validators.add(validator)
try:
await allFutures(pendingRequests)
except CancelledError as exc:
var pendingCancel: seq[Future[void]]
for future in pendingRequests:
if not(future.finished()):
pendingCancel.add(future.cancelAndWait())
await allFutures(pendingCancel)
raise exc
let daps =
block:
var res: seq[DutyAndProof]
for index, fut in pendingRequests:
let
item = addOrReplaceItems[index]
dap =
if fut.done():
let sigRes = fut.read()
if sigRes.isErr():
warn "Unable to create slot signature using remote signer",
validator = shortLog(validators[index]),
error_msg = sigRes.error()
DutyAndProof.init(item.epoch, currentRoot.get(), item.duty,
none[ValidatorSig]())
else:
DutyAndProof.init(item.epoch, currentRoot.get(), item.duty,
some(sigRes.get()))
else:
DutyAndProof.init(item.epoch, currentRoot.get(), item.duty,
none[ValidatorSig]())
res.add(dap)
res
for item in daps:
# Update VC attesters registry with current version of DutyAndProof.
vc.attesters.mgetOrPut(item.data.pubkey,
default(EpochDuties)).duties[item.epoch] = item
if vc.config.distributedEnabled:
let selections = daps.getSelections()
if len(selections) == 0:
return len(addOrReplaceItems)
let sresponse =
try:
# Query middleware for aggregated signatures.
await vc.submitBeaconCommitteeSelections(selections,
ApiStrategyKind.Best)
except ValidatorApiError as exc:
warn "Unable to submit beacon committee selections", epoch = epoch,
reason = exc.getFailureReason()
return 0
except CancelledError as exc:
debug "Beacon committee selections processing was interrupted"
raise exc
except CatchableError as exc:
error "Unexpected error occured while trying to submit beacon " &
"committee selections", epoch = epoch, err_name = exc.name,
err_msg = exc.msg
return 0
for selection in sresponse.data:
let selectionProof = selection.selection_proof.load().valueOr:
warn "Invalid signature encountered while processing beacon " &
"committee selections",
validator_index = ValidatorIndex(selection.validator_index),
slot = selection.slot,
selection_proof = shortLog(selection.selection_proof)
continue
let dres =
block:
var res: Opt[DutyAndProof]
for dap in daps:
if (uint64(dap.data.validator_index) ==
uint64(selection.validator_index)) and
(dap.data.slot == selection.slot):
var ndap = dap
ndap.slotSig = some(selection.selection_proof)
res = Opt.some(ndap)
break
res
if dres.isSome():
# Update VC attesters registry with new aggregated DutyAndProof.
let dap = dres.get()
vc.attesters.mgetOrPut(
dap.data.pubkey, default(EpochDuties)).duties[dap.epoch] = dap
for item in addOrReplaceItems:
let dap = DutyAndProof.init(item.epoch, currentRoot.get(), item.duty,
Opt.none(ValidatorSig))
vc.attesters.mgetOrPut(dap.data.pubkey,
default(EpochDuties)).duties[dap.epoch] = dap
return len(addOrReplaceItems)
@ -312,6 +375,174 @@ proc pruneSyncCommitteeDuties*(service: DutiesServiceRef, slot: Slot) =
newSyncCommitteeDuties[key] = currentPeriodDuties
vc.syncCommitteeDuties = newSyncCommitteeDuties
proc fillSyncSlotSignatures*(
service: DutiesServiceRef,
epochPeriods: seq[Epoch]
) {.async.} =
let
vc = service.client
genesisRoot = vc.beaconGenesis.genesis_validators_root
validatorDuties =
block:
var res: seq[SyncValidatorAndDuty]
for epoch in epochPeriods:
for duty in vc.syncDutiesForEpoch(epoch):
if len(duty.slotSigs) == 0:
let validator = vc.attachedValidators[].getValidator(
duty.data.pubkey).valueOr:
continue
res.add(
SyncValidatorAndDuty(
validator: validator,
duty: duty))
res
(pendingRequests, pendingIds) =
block:
var
sres: seq[Future[SignatureResult]]
ires: seq[SyncSlotRequest]
for epoch in epochPeriods:
let fork = vc.forkAtEpoch(epoch)
for slot in epoch.slots():
for item in validatorDuties:
for syncCommitteeIndex in
item.duty.data.validator_sync_committee_indices:
let subCommitteeIndex = getSubcommitteeIndex(syncCommitteeIndex)
sres.add(
item.validator.getSyncCommitteeSelectionProof(
fork, genesisRoot, slot, subCommitteeIndex))
ires.add(
SyncSlotRequest(
validator: item.validator,
slot: slot,
validatorSyncCommitteeIndex: syncCommitteeIndex,
validatorSubCommitteeIndex: subCommitteeIndex))
(sres, ires)
try:
# TODO (cheatfate): Here we waiting for all signature requests, but we could
# perform waiting slot by slot, so all the tasks from early slot could
# start working without delays.
await allFutures(pendingRequests)
except CancelledError as exc:
var pending: seq[Future[void]]
for future in pendingRequests:
if not(future.finished()): pending.add(future.cancelAndWait())
await allFutures(pending)
raise exc
for index, fut in pendingRequests.pairs():
let
pid = pendingIds[index]
epoch = pid.slot.epoch()
slotIndex = pid.slot - epoch.start_slot()
signature =
if fut.done():
let sres = fut.read()
if sres.isErr():
warn "Unable to create slot proof using remote signer",
reason = sres.error(), epoch = epoch,
slot = pid.slot, slot_index = slotIndex,
validator = shortLog(pid.validator)
Opt.none(ValidatorSig)
else:
Opt.some(sres.get())
else:
Opt.none(ValidatorSig)
vc.syncCommitteeDuties.withValue(pid.validator.pubkey, map):
map[].duties.withValue(epoch, sdap):
sdap[].slotSigs.withValue(pid.validatorSubCommitteeIndex, proofs):
proofs[][slotIndex] = signature
if vc.config.distributedEnabled:
var indexToKey: Table[ValidatorIndex, Opt[ValidatorPubKey]]
let selections =
block:
var sres: seq[RestSyncCommitteeSelection]
for epoch in epochPeriods:
for duty in vc.syncDutiesForEpoch(epoch):
let
validator = vc.attachedValidators[].getValidator(
duty.data.pubkey).valueOr:
continue
vindex = validator.index.valueOr:
continue
startSlot = duty.epoch.start_slot()
indexToKey[vindex] = Opt.some(validator.pubkey)
for subCommitteeIndex, proofs in duty.slotSigs.pairs():
for slotIndex, selection_proof in proofs.pairs():
if selection_proof.isNone(): continue
sres.add(RestSyncCommitteeSelection(
validator_index: RestValidatorIndex(vindex),
slot: startSlot + slotIndex,
subcommittee_index: uint64(subCommitteeIndex),
selection_proof: selection_proof.get()
))
sres
if len(selections) == 0: return
let sresponse =
try:
# Query middleware for aggregated signatures.
await vc.submitSyncCommitteeSelections(selections,
ApiStrategyKind.Best)
except ValidatorApiError as exc:
warn "Unable to submit sync committee selections",
reason = exc.getFailureReason()
return
except CancelledError as exc:
debug "Beacon committee selections processing was interrupted"
raise exc
except CatchableError as exc:
error "Unexpected error occured while trying to submit sync " &
"committee selections", reason = exc.msg, error = exc.name
return
for selection in sresponse.data:
let
vindex = selection.validator_index.toValidatorIndex().valueOr:
warn "Invalid validator_index value encountered while processing " &
"sync committee selections",
validator_index = uint64(selection.validator_index),
reason = $error
continue
selectionProof = selection.selection_proof.load().valueOr:
warn "Invalid signature encountered while processing " &
"sync committee selections",
validator_index = vindex,
slot = selection.slot,
subcommittee_index = selection.subcommittee_index,
selection_proof = shortLog(selection.selection_proof)
continue
epoch = selection.slot.epoch()
slotIndex = selection.slot - epoch.start_slot()
# Position in our slot_proofs array
subCommitteeIndex = SyncSubcommitteeIndex(selection.subcommittee_index)
validator =
block:
# Selections operating using validator indices, so we should check
# if we have such validator index in our validator's pool and it
# still in place (not removed using keystore manager).
let key = indexToKey.getOrDefault(vindex)
if key.isNone():
warn "Non-existing validator encountered while processing " &
"sync committee selections",
validator_index = vindex,
slot = selection.slot,
subcommittee_index = selection.subcommittee_index,
selection_proof = shortLog(selection.selection_proof)
continue
vc.attachedValidators[].getValidator(key.get()).valueOr:
continue
vc.syncCommitteeDuties.withValue(validator.pubkey, map):
map[].duties.withValue(epoch, sdap):
sdap[].slotSigs.withValue(subCommitteeIndex, proofs):
proofs[][slotIndex] = Opt.some(selectionProof.toValidatorSig())
proc pollForSyncCommitteeDuties*(service: DutiesServiceRef,
epoch: Epoch): Future[int] {.async.} =
let vc = service.client
@ -351,45 +582,30 @@ proc pollForSyncCommitteeDuties*(service: DutiesServiceRef,
remainingItems -= arraySize
let
relevantDuties =
block:
var res: seq[SyncCommitteeDuty]
for duty in filteredDuties:
for validatorSyncCommitteeIndex in
duty.validator_sync_committee_indices:
res.add(SyncCommitteeDuty(
pubkey: duty.pubkey,
validator_index: duty.validator_index,
validator_sync_committee_index: validatorSyncCommitteeIndex))
res
relevantSdaps = filteredDuties.mapIt(SyncDutyAndProof.init(epoch, it))
fork = vc.forkAtEpoch(epoch)
let addOrReplaceItems =
block:
var alreadyWarned = false
var res: seq[tuple[epoch: Epoch, duty: SyncCommitteeDuty]]
for duty in relevantDuties:
var dutyFound = false
vc.syncCommitteeDuties.withValue(duty.pubkey, map):
map.duties.withValue(epoch, epochDuty):
if epochDuty[] != duty:
dutyFound = true
if dutyFound and not alreadyWarned:
info "Sync committee duties re-organization", duty, epoch
alreadyWarned = true
res.add((epoch, duty))
res
addOrReplaceItems =
block:
var
alreadyWarned = false
res: seq[tuple[epoch: Epoch, duty: SyncDutyAndProof]]
for sdap in relevantSdaps:
var dutyFound = false
vc.syncCommitteeDuties.withValue(sdap.data.pubkey, map):
map.duties.withValue(epoch, epochDuty):
if epochDuty[] != sdap:
dutyFound = true
if dutyFound and not(alreadyWarned):
info "Sync committee duties re-organization", sdap, epoch
alreadyWarned = true
res.add((epoch, sdap))
res
if len(addOrReplaceItems) > 0:
for epoch, duty in items(addOrReplaceItems):
var validatorDuties =
vc.syncCommitteeDuties.getOrDefault(duty.pubkey)
validatorDuties.duties[epoch] = duty
vc.syncCommitteeDuties[duty.pubkey] = validatorDuties
for epoch, sdap in items(addOrReplaceItems):
vc.syncCommitteeDuties.mgetOrPut(sdap.data.pubkey,
default(EpochSyncDuties)).duties[epoch] = sdap
return len(addOrReplaceItems)
@ -431,6 +647,8 @@ proc pollForAttesterDuties*(service: DutiesServiceRef) {.async.} =
if (counts[0].count == 0) and (counts[1].count == 0):
debug "No new attester's duties received", slot = currentSlot
await service.fillAttestationSlotSignatures(@[currentEpoch, nextEpoch])
let subscriptions =
block:
var res: seq[RestCommitteeSubscription]
@ -490,22 +708,26 @@ proc pollForSyncCommitteeDuties*(service: DutiesServiceRef) {.async.} =
)
res
(counts, total) =
(counts, epochs, total) =
block:
var res: seq[tuple[epoch: Epoch, period: SyncCommitteePeriod,
count: int]]
var periods: seq[Epoch]
var total = 0
if len(dutyPeriods) > 0:
for (epoch, period) in dutyPeriods:
let count = await service.pollForSyncCommitteeDuties(epoch)
res.add((epoch: epoch, period: period, count: count))
periods.add(epoch)
total += count
(res, total)
(res, periods, total)
if total == 0:
debug "No new sync committee member's duties received",
slot = currentSlot
await service.fillSyncSlotSignatures(epochs)
let subscriptions =
block:
var res: seq[RestSyncCommitteeSubscription]
@ -668,8 +890,13 @@ proc attesterDutiesLoop(service: DutiesServiceRef) {.async.} =
)
doAssert(len(vc.forks) > 0, "Fork schedule must not be empty at this point")
while true:
await service.pollForAttesterDuties()
await service.waitForNextSlot(AttesterLoop)
# Cleaning up previous attestation duties task.
if not(isNil(service.pollingAttesterDutiesTask)) and
not(service.pollingAttesterDutiesTask.finished()):
await cancelAndWait(service.pollingAttesterDutiesTask)
# Spawning new attestation duties task.
service.pollingAttesterDutiesTask = service.pollForAttesterDuties()
proc proposerDutiesLoop(service: DutiesServiceRef) {.async.} =
let vc = service.client
@ -727,8 +954,13 @@ proc syncCommitteeDutiesLoop(service: DutiesServiceRef) {.async.} =
)
doAssert(len(vc.forks) > 0, "Fork schedule must not be empty at this point")
while true:
await service.pollForSyncCommitteeDuties()
await service.waitForNextSlot(SyncCommitteeLoop)
# Cleaning up previous attestation duties task.
if not(isNil(service.pollingSyncDutiesTask)) and
not(service.pollingSyncDutiesTask.finished()):
await cancelAndWait(service.pollingSyncDutiesTask)
# Spawning new attestation duties task.
service.pollingSyncDutiesTask = service.pollForAttesterDuties()
template checkAndRestart(serviceLoop: DutiesServiceLoop,
future: Future[void], body: untyped): untyped =
@ -803,6 +1035,12 @@ proc mainLoop(service: DutiesServiceRef) {.async.} =
pending.add(prepareFut.cancelAndWait())
if not(isNil(registerFut)) and not(registerFut.finished()):
pending.add(registerFut.cancelAndWait())
if not(isNil(service.pollingAttesterDutiesTask)) and
not(service.pollingAttesterDutiesTask.finished()):
pending.add(service.pollingAttesterDutiesTask.cancelAndWait())
if not(isNil(service.pollingSyncDutiesTask)) and
not(service.pollingSyncDutiesTask.finished()):
pending.add(service.pollingSyncDutiesTask.cancelAndWait())
await allFutures(pending)
true
except CatchableError as exc:

View File

@ -25,19 +25,27 @@ type
validator: AttachedValidator
subcommitteeIdx: SyncSubcommitteeIndex
ValidatorAndSig* = object
validator: AttachedValidator
signature: ValidatorSig
proc serveSyncCommitteeMessage*(service: SyncCommitteeServiceRef,
slot: Slot, beaconBlockRoot: Eth2Digest,
duty: SyncCommitteeDuty): Future[bool] {.
duty: SyncCommitteeDuty,
index: IndexInSyncCommittee): Future[bool] {.
async.} =
let
vc = service.client
fork = vc.forkAtEpoch(slot.epoch)
genesisValidatorsRoot = vc.beaconGenesis.genesis_validators_root
vindex = duty.validator_index
subcommitteeIdx = getSubcommitteeIndex(
duty.validator_sync_committee_index)
validator = vc.getValidatorForDuties(
duty.pubkey, slot, slashingSafe = true).valueOr: return false
subcommitteeIdx = getSubcommitteeIndex(index)
logScope:
validator = shortLog(validator)
let
message =
block:
let res = await getSyncCommitteeMessage(validator, fork,
@ -50,8 +58,8 @@ proc serveSyncCommitteeMessage*(service: SyncCommitteeServiceRef,
return
res.get()
debug "Sending sync committee message", message = shortLog(message),
validator = shortLog(validator), validator_index = vindex,
debug "Sending sync committee message",
message = shortLog(message),
delay = vc.getDelay(message.slot.sync_committee_message_deadline())
let res =
@ -60,18 +68,14 @@ proc serveSyncCommitteeMessage*(service: SyncCommitteeServiceRef,
except ValidatorApiError as exc:
warn "Unable to publish sync committee message",
message = shortLog(message),
validator = shortLog(validator),
validator_index = vindex,
reason = exc.getFailureReason()
return false
except CancelledError:
except CancelledError as exc:
debug "Publish sync committee message request was interrupted"
return false
raise exc
except CatchableError as exc:
error "Unexpected error occurred while publishing sync committee message",
message = shortLog(message),
validator = shortLog(validator),
validator_index = vindex,
err_name = exc.name, err_msg = exc.msg
return false
@ -81,32 +85,32 @@ proc serveSyncCommitteeMessage*(service: SyncCommitteeServiceRef,
beacon_sync_committee_message_sent_delay.observe(delay.toFloatSeconds())
notice "Sync committee message published",
message = shortLog(message),
validator = shortLog(validator),
validator_index = vindex,
delay = delay
else:
warn "Sync committee message was not accepted by beacon node",
message = shortLog(message),
validator = shortLog(validator),
validator_index = vindex, delay = delay
delay = delay
return res
proc produceAndPublishSyncCommitteeMessages(service: SyncCommitteeServiceRef,
slot: Slot,
beaconBlockRoot: Eth2Digest,
duties: seq[SyncCommitteeDuty])
duties: seq[SyncDutyAndProof])
{.async.} =
let vc = service.client
let pendingSyncCommitteeMessages =
block:
var res: seq[Future[bool]]
for duty in duties:
for sdap in duties:
let duty = sdap.data
debug "Serving sync message duty", duty, epoch = slot.epoch()
res.add(service.serveSyncCommitteeMessage(slot,
beaconBlockRoot,
duty))
for syncCommitteeIndex in duty.validator_sync_committee_indices:
res.add(service.serveSyncCommitteeMessage(slot,
beaconBlockRoot,
duty,
syncCommitteeIndex))
res
let statistics =
@ -180,9 +184,9 @@ proc serveContributionAndProof*(service: SyncCommitteeServiceRef,
err_msg = exc.msg,
reason = exc.getFailureReason()
false
except CancelledError:
except CancelledError as exc:
debug "Publish sync contribution request was interrupted"
return false
raise exc
except CatchableError as err:
error "Unexpected error occurred while publishing sync contribution",
contribution = shortLog(proof.contribution),
@ -205,73 +209,61 @@ proc serveContributionAndProof*(service: SyncCommitteeServiceRef,
proc produceAndPublishContributions(service: SyncCommitteeServiceRef,
slot: Slot,
beaconBlockRoot: Eth2Digest,
duties: seq[SyncCommitteeDuty]) {.async.} =
duties: seq[SyncDutyAndProof]) {.async.} =
let
vc = service.client
epoch = slot.epoch
epoch = slot.epoch()
fork = vc.forkAtEpoch(epoch)
var slotSignatureReqs: seq[Future[SignatureResult]]
var validators: seq[(AttachedValidator, SyncSubcommitteeIndex)]
var
contributionFuts: array[SYNC_COMMITTEE_SUBNET_COUNT,
Future[SyncCommitteeContribution]]
for duty in duties:
let
validator = vc.getValidatorForDuties(duty.pubkey, slot).valueOr:
continue
subCommitteeIdx =
getSubcommitteeIndex(duty.validator_sync_committee_index)
future = validator.getSyncCommitteeSelectionProof(
fork,
vc.beaconGenesis.genesis_validators_root,
slot,
subCommitteeIdx)
let validatorContributions =
block:
var res: seq[ContributionItem]
for sdap in duties:
let duty = sdap.data
for syncCommitteeIndex in duty.validator_sync_committee_indices:
let
subCommitteeIndex = getSubcommitteeIndex(syncCommitteeIndex)
slotIndex = slot - slot.epoch().start_slot()
signature =
block:
let signatures = sdap.slotSigs.getOrDefault(subCommitteeIndex)
if signatures[slotIndex].isNone():
continue
signatures[slotIndex].get()
validator = vc.getValidatorForDuties(duty.pubkey, slot).valueOr:
continue
vindex = validator.index.valueOr:
continue
if is_sync_committee_aggregator(signature):
res.add(ContributionItem(
aggregator_index: uint64(vindex),
selection_proof: signature,
validator: validator,
subcommitteeIdx: subCommitteeIndex
))
if isNil(contributionFuts[int(subCommitteeIndex)]):
contributionFuts[int(subCommitteeIndex)] =
vc.produceSyncCommitteeContribution(
slot, subCommitteeIndex, beaconBlockRoot,
ApiStrategyKind.Best)
res
slotSignatureReqs.add(future)
validators.add((validator, subCommitteeIdx))
let pendingFutures = contributionFuts.filterIt(not(isNil(it)))
try:
await allFutures(slotSignatureReqs)
await allFutures(pendingFutures)
except CancelledError as exc:
var pendingCancel: seq[Future[void]]
for future in slotSignatureReqs:
if not(future.finished()):
pendingCancel.add(future.cancelAndWait())
await allFutures(pendingCancel)
var pending: seq[Future[void]]
for fut in pendingFutures:
if not(fut.finished()):
pending.add(fut.cancelAndWait())
await allFutures(pending)
raise exc
var
contributionsFuts: array[SYNC_COMMITTEE_SUBNET_COUNT,
Future[SyncCommitteeContribution]]
let validatorContributions = block:
var res: seq[ContributionItem]
for idx, fut in slotSignatureReqs:
if fut.completed:
let
sigRes = fut.read
validator = validators[idx][0]
subCommitteeIdx = validators[idx][1]
if sigRes.isErr():
warn "Unable to create slot signature using remote signer",
validator = shortLog(validator),
error_msg = sigRes.error()
elif validator.index.isSome and
is_sync_committee_aggregator(sigRes.get):
res.add ContributionItem(
aggregator_index: uint64(validator.index.get),
selection_proof: sigRes.get,
validator: validator,
subcommitteeIdx: subCommitteeIdx)
if isNil(contributionsFuts[subCommitteeIdx]):
contributionsFuts[int subCommitteeIdx] =
vc.produceSyncCommitteeContribution(
slot,
subCommitteeIdx,
beaconBlockRoot,
ApiStrategyKind.Best)
res
if len(validatorContributions) > 0:
let pendingAggregates =
block:
@ -279,22 +271,21 @@ proc produceAndPublishContributions(service: SyncCommitteeServiceRef,
for item in validatorContributions:
let aggContribution =
try:
await contributionsFuts[item.subcommitteeIdx]
contributionFuts[item.subcommitteeIdx].read()
except ValidatorApiError as exc:
warn "Unable to get sync message contribution data", slot = slot,
beaconBlockRoot = shortLog(beaconBlockRoot),
reason = exc.getFailureReason()
return
except CancelledError:
continue
except CancelledError as exc:
debug "Request for sync message contribution was interrupted"
return
raise exc
except CatchableError as exc:
error "Unexpected error occurred while getting sync message "&
"contribution", slot = slot,
beaconBlockRoot = shortLog(beaconBlockRoot),
err_name = exc.name, err_msg = exc.msg
return
continue
let proof = ContributionAndProof(
aggregator_index: item.aggregator_index,
contribution: aggContribution,
@ -308,12 +299,12 @@ proc produceAndPublishContributions(service: SyncCommitteeServiceRef,
var errored, succeed, failed = 0
try:
await allFutures(pendingAggregates)
except CancelledError as err:
except CancelledError as exc:
for fut in pendingAggregates:
if not(fut.finished()):
fut.cancel()
await allFutures(pendingAggregates)
raise err
raise exc
for future in pendingAggregates:
if future.completed():
@ -336,7 +327,7 @@ proc produceAndPublishContributions(service: SyncCommitteeServiceRef,
proc publishSyncMessagesAndContributions(service: SyncCommitteeServiceRef,
slot: Slot,
duties: seq[SyncCommitteeDuty]) {.
duties: seq[SyncDutyAndProof]) {.
async.} =
let vc = service.client
@ -367,9 +358,9 @@ proc publishSyncMessagesAndContributions(service: SyncCommitteeServiceRef,
warn "Unable to retrieve head block's root to sign", reason = exc.msg,
reason = exc.getFailureReason()
return
except CancelledError:
except CancelledError as exc:
debug "Block root request was interrupted"
return
raise exc
except CatchableError as exc:
error "Unexpected error while requesting sync message block root",
err_name = exc.name, err_msg = exc.msg, slot = slot
@ -383,9 +374,9 @@ proc publishSyncMessagesAndContributions(service: SyncCommitteeServiceRef,
warn "Unable to proceed sync committee messages", slot = slot,
duties_count = len(duties), reason = exc.getFailureReason()
return
except CancelledError:
except CancelledError as exc:
debug "Sync committee producing process was interrupted"
return
raise exc
except CatchableError as exc:
error "Unexpected error while producing sync committee messages",
slot = slot,