Fix sync contributions did not get proper signatures issue.

Add chunked signature processing to both slot signatures and selection proofs.
This commit is contained in:
cheatfate 2023-05-16 14:14:59 +03:00
parent 7eb49b5948
commit cf6610c01b
No known key found for this signature in database
GPG Key ID: 46ADD633A7201F95
1 changed files with 111 additions and 92 deletions

View File

@ -12,6 +12,8 @@ import common, api, block_service
const
ServiceName = "duties_service"
SUBSCRIPTION_LOOKAHEAD_EPOCHS* = 4'u64
SYNC_SIGNING_CHUNK_SIZE = 64
ATTESTATION_SIGNING_CHUNK_SIZE = 64
logScope: service = ServiceName
@ -28,11 +30,13 @@ type
AttestationSlotRequest = object
validator: AttachedValidator
fork: Fork
slot: Slot
SyncSlotRequest = object
validator: AttachedValidator
slot: Slot
fork: Fork
validatorSyncCommitteeIndex: IndexInSyncCommittee
validatorSubCommitteeIndex: SyncSubcommitteeIndex
@ -48,6 +52,13 @@ chronicles.formatIt(DutiesServiceLoop):
proc cmp(x, y: AttestationSlotRequest): int =
if x.slot == y.slot: 0 elif x.slot < y.slot: -1 else: 1
proc cmp(x, y: SyncSlotRequest): int =
if x.slot == y.slot: 0 elif x.slot < y.slot: -1 else: 1
iterator chunks*[T](data: openArray[T], maxCount: Positive): seq[T] =
for i in countup(0, len(data) - 1, maxCount):
yield @(data.toOpenArray(i, min(i + maxCount, len(data)) - 1))
proc checkDuty(duty: RestAttesterDuty): bool =
(duty.committee_length <= MAX_VALIDATORS_PER_COMMITTEE) and
(uint64(duty.committee_index) < MAX_COMMITTEES_PER_SLOT) and
@ -140,56 +151,58 @@ proc fillAttestationSlotSignatures*(
var res: seq[AttestationSlotRequest]
for epoch in epochPeriods:
for duty in vc.attesterDutiesForEpoch(epoch):
if duty.slotSig.isNone():
let validator =
vc.attachedValidators[].
getValidator(duty.data.pubkey).valueOr:
continue
res.add(AttestationSlotRequest(
validator: validator,
slot: duty.data.slot
))
if duty.slotSig.isSome():
continue
let validator =
vc.attachedValidators[].
getValidator(duty.data.pubkey).valueOr:
continue
if validator.index.isNone():
continue
res.add(AttestationSlotRequest(
validator: validator,
slot: duty.data.slot,
fork: vc.forkAtEpoch(duty.data.slot.epoch())
))
# We make requests sorted by slot number.
sorted(res, cmp, order = SortOrder.Ascending)
pendingRequests =
block:
var res: seq[Future[SignatureResult]]
for request in requests:
let fork = vc.forkAtEpoch(request.slot.epoch())
res.add(request.validator.getSlotSignature(fork, genesisRoot,
request.slot))
res
try:
# TODO (cheatfate): Here we waiting for all signatures, but when remote
# signer is used we could try different approach.
await allFutures(pendingRequests)
except CancelledError as exc:
var pending: seq[Future[void]]
for future in pendingRequests:
if not(future.finished()): pending.add(future.cancelAndWait())
await allFutures(pending)
raise exc
# We creating signatures in chunks to make VC more responsive for big number
# of validators. In this case tasks which are running concurrently could
# get their slot signatures for first epoch's slots even before this
# processing ends.
for chunk in requests.chunks(ATTESTATION_SIGNING_CHUNK_SIZE):
let pendingRequests = chunk.mapIt(
getSlotSignature(it.validator, it.fork, genesisRoot, it.slot))
for index, fut in pendingRequests.pairs():
let
request = requests[index]
signature =
if fut.done():
let sres = fut.read()
if sres.isErr():
warn "Unable to create slot signature using remote signer",
reason = sres.error(), epoch = request.slot.epoch(),
slot = request.slot
Opt.none(ValidatorSig)
try:
await allFutures(pendingRequests)
except CancelledError as exc:
var pending: seq[Future[void]]
for future in pendingRequests:
if not(future.finished()): pending.add(future.cancelAndWait())
await allFutures(pending)
raise exc
for index, fut in pendingRequests.pairs():
let
request = requests[index]
signature =
if fut.done():
let sres = fut.read()
if sres.isErr():
warn "Unable to create slot signature using remote signer",
reason = sres.error(), epoch = request.slot.epoch(),
slot = request.slot
Opt.none(ValidatorSig)
else:
Opt.some(sres.get())
else:
Opt.some(sres.get())
else:
Opt.none(ValidatorSig)
Opt.none(ValidatorSig)
vc.attesters.withValue(request.validator.pubkey, map):
map[].duties.withValue(request.slot.epoch(), dap):
dap[].slotSig = signature
vc.attesters.withValue(request.validator.pubkey, map):
map[].duties.withValue(request.slot.epoch(), dap):
dap[].slotSig = signature
if vc.config.distributedEnabled:
var indexToKey: Table[ValidatorIndex, Opt[ValidatorPubKey]]
@ -397,11 +410,9 @@ proc fillSyncSlotSignatures*(
validator: validator,
duty: duty))
res
(pendingRequests, pendingIds) =
requests =
block:
var
sres: seq[Future[SignatureResult]]
ires: seq[SyncSlotRequest]
var res: seq[SyncSlotRequest]
for epoch in epochPeriods:
let fork = vc.forkAtEpoch(epoch)
for slot in epoch.slots():
@ -409,52 +420,60 @@ proc fillSyncSlotSignatures*(
for syncCommitteeIndex in
item.duty.data.validator_sync_committee_indices:
let subCommitteeIndex = getSubcommitteeIndex(syncCommitteeIndex)
sres.add(
item.validator.getSyncCommitteeSelectionProof(
fork, genesisRoot, slot, subCommitteeIndex))
ires.add(
SyncSlotRequest(
validator: item.validator,
slot: slot,
validatorSyncCommitteeIndex: syncCommitteeIndex,
validatorSubCommitteeIndex: subCommitteeIndex))
(sres, ires)
res.add(SyncSlotRequest(
validator: item.validator,
slot: slot,
fork: fork,
validatorSyncCommitteeIndex: syncCommitteeIndex,
validatorSubCommitteeIndex: subCommitteeIndex))
res
try:
# TODO (cheatfate): Here we waiting for all signature requests, but we could
# perform waiting slot by slot, so all the tasks from early slot could
# start working without delays.
await allFutures(pendingRequests)
except CancelledError as exc:
var pending: seq[Future[void]]
for future in pendingRequests:
if not(future.finished()): pending.add(future.cancelAndWait())
await allFutures(pending)
raise exc
# We creating signatures in chunks to make VC more responsive for big number
# of validators. In this case tasks which are running concurrently could
# get their slot signatures for first epoch's slots even before this
# processing ends.
for chunk in requests.chunks(SYNC_SIGNING_CHUNK_SIZE):
let pendingRequests = chunk.mapIt(
getSyncCommitteeSelectionProof(
it.validator, it.fork, genesisRoot, it.slot,
it.validatorSubCommitteeIndex))
for index, fut in pendingRequests.pairs():
let
pid = pendingIds[index]
epoch = pid.slot.epoch()
slotIndex = pid.slot - epoch.start_slot()
signature =
if fut.done():
let sres = fut.read()
if sres.isErr():
warn "Unable to create slot proof using remote signer",
reason = sres.error(), epoch = epoch,
slot = pid.slot, slot_index = slotIndex,
validator = shortLog(pid.validator)
Opt.none(ValidatorSig)
try:
await allFutures(pendingRequests)
except CancelledError as exc:
var pending: seq[Future[void]]
for future in pendingRequests:
if not(future.finished()): pending.add(future.cancelAndWait())
await allFutures(pending)
raise exc
for index, fut in pendingRequests.pairs():
let
request = chunk[index]
epoch = request.slot.epoch()
slotIndex = request.slot - epoch.start_slot()
signature =
if fut.done():
let sres = fut.read()
if sres.isErr():
warn "Unable to create slot proof using remote signer",
reason = sres.error(), epoch = epoch,
slot = request.slot, slot_index = slotIndex,
validator = shortLog(request.validator)
Opt.none(ValidatorSig)
else:
Opt.some(sres.get())
else:
Opt.some(sres.get())
else:
Opt.none(ValidatorSig)
Opt.none(ValidatorSig)
vc.syncCommitteeDuties.withValue(pid.validator.pubkey, map):
map[].duties.withValue(epoch, sdap):
sdap[].slotSigs.withValue(pid.validatorSubCommitteeIndex, proofs):
proofs[][slotIndex] = signature
vc.syncCommitteeDuties.withValue(request.validator.pubkey, map):
map[].duties.withValue(epoch, sdap):
# We can't use `withValue` here, because `sdap` has an empty Table
# after initialized.
sdap[].slotSigs.mgetOrPut(
request.validatorSubCommitteeIndex,
default(SlotProofsArray)
)[slotIndex] = signature
if vc.config.distributedEnabled:
var indexToKey: Table[ValidatorIndex, Opt[ValidatorPubKey]]
@ -655,7 +674,7 @@ proc pollForAttesterDuties*(service: DutiesServiceRef) {.async.} =
block:
let moment = Moment.now()
await service.fillAttestationSlotSignatures(@[currentEpoch, nextEpoch])
notice "Slot signatures has been obtained", time = (Moment.now() - moment)
debug "Slot signatures has been obtained", time = (Moment.now() - moment)
let subscriptions =
block:
@ -737,7 +756,7 @@ proc pollForSyncCommitteeDuties*(service: DutiesServiceRef) {.async.} =
block:
let moment = Moment.now()
await service.fillSyncSlotSignatures(epochs)
notice "Sync selection proofs has been obtained",
debug "Sync selection proofs has been obtained",
time = (Moment.now() - moment)
let subscriptions =