mirror of
https://github.com/status-im/nimbus-eth2.git
synced 2025-01-10 14:26:26 +00:00
Batch slashing protection registration (#5604)
This PR brings down the time to send 100 attestations from ~1s to ~100ms, making it feasible to run 10k validators on a single node (which regularly send 300 attestations / slot). This is done by batching the slashing protection database write in a single transaction thus avoiding a slow fsync for every signature - effects will be more pronounced on slow drives. The benefit applies both to beacon and client validators.
This commit is contained in:
parent
d8144c6de1
commit
e1e809eeb7
@ -8,7 +8,7 @@
|
||||
import
|
||||
std/sets,
|
||||
chronicles,
|
||||
../validators/activity_metrics,
|
||||
../validators/[activity_metrics, validator_duties],
|
||||
"."/[common, api]
|
||||
|
||||
const
|
||||
@ -22,42 +22,22 @@ type
|
||||
selection_proof: ValidatorSig
|
||||
validator: AttachedValidator
|
||||
|
||||
proc serveAttestation(service: AttestationServiceRef, adata: AttestationData,
|
||||
duty: DutyAndProof): Future[bool] {.async.} =
|
||||
let vc = service.client
|
||||
let validator = vc.getValidatorForDuties(
|
||||
duty.data.pubkey, adata.slot).valueOr:
|
||||
return false
|
||||
let fork = vc.forkAtEpoch(adata.slot.epoch)
|
||||
|
||||
doAssert(validator.index.isSome())
|
||||
let vindex = validator.index.get()
|
||||
proc serveAttestation(
|
||||
service: AttestationServiceRef, registered: RegisteredAttestation):
|
||||
Future[bool] {.async.} =
|
||||
let
|
||||
vc = service.client
|
||||
fork = vc.forkAtEpoch(registered.data.slot.epoch)
|
||||
validator = registered.validator
|
||||
|
||||
logScope:
|
||||
validator = validatorLog(validator)
|
||||
|
||||
# TODO: signing_root is recomputed in getAttestationSignature just after,
|
||||
# but not for locally attached validators.
|
||||
let signingRoot =
|
||||
compute_attestation_signing_root(
|
||||
fork, vc.beaconGenesis.genesis_validators_root, adata)
|
||||
|
||||
let notSlashable = vc.attachedValidators[].slashingProtection
|
||||
.registerAttestation(vindex, validator.pubkey,
|
||||
adata.source.epoch,
|
||||
adata.target.epoch, signingRoot)
|
||||
if notSlashable.isErr():
|
||||
warn "Slashing protection activated for attestation",
|
||||
attestationData = shortLog(adata),
|
||||
signingRoot = shortLog(signingRoot),
|
||||
badVoteDetails = $notSlashable.error
|
||||
return false
|
||||
|
||||
let attestation = block:
|
||||
let signature =
|
||||
try:
|
||||
let res = await validator.getAttestationSignature(
|
||||
fork, vc.beaconGenesis.genesis_validators_root, adata)
|
||||
fork, vc.beaconGenesis.genesis_validators_root, registered.data)
|
||||
if res.isErr():
|
||||
warn "Unable to sign attestation", reason = res.error()
|
||||
return false
|
||||
@ -69,15 +49,11 @@ proc serveAttestation(service: AttestationServiceRef, adata: AttestationData,
|
||||
error "An unexpected error occurred while signing attestation",
|
||||
err_name = exc.name, err_msg = exc.msg
|
||||
return false
|
||||
|
||||
Attestation.init(
|
||||
[duty.data.validator_committee_index],
|
||||
int(duty.data.committee_length), adata, signature).expect(
|
||||
"data validity checked earlier")
|
||||
registered.toAttestation(signature)
|
||||
|
||||
logScope:
|
||||
attestation = shortLog(attestation)
|
||||
delay = vc.getDelay(adata.slot.attestation_deadline())
|
||||
delay = vc.getDelay(registered.data.slot.attestation_deadline())
|
||||
|
||||
debug "Sending attestation"
|
||||
|
||||
@ -98,7 +74,7 @@ proc serveAttestation(service: AttestationServiceRef, adata: AttestationData,
|
||||
return false
|
||||
|
||||
if res:
|
||||
let delay = vc.getDelay(adata.slot.attestation_deadline())
|
||||
let delay = vc.getDelay(attestation.data.slot.attestation_deadline())
|
||||
beacon_attestations_sent.inc()
|
||||
beacon_attestation_sent_delay.observe(delay.toFloatSeconds())
|
||||
notice "Attestation published"
|
||||
@ -176,57 +152,94 @@ proc produceAndPublishAttestations*(service: AttestationServiceRef,
|
||||
): Future[AttestationData] {.
|
||||
async.} =
|
||||
doAssert(MAX_VALIDATORS_PER_COMMITTEE <= uint64(high(int)))
|
||||
let vc = service.client
|
||||
let
|
||||
vc = service.client
|
||||
fork = vc.forkAtEpoch(slot.epoch)
|
||||
|
||||
# This call could raise ValidatorApiError, but it is handled in
|
||||
# publishAttestationsAndAggregates().
|
||||
let ad = await vc.produceAttestationData(slot, committee_index,
|
||||
ApiStrategyKind.Best)
|
||||
let data = await vc.produceAttestationData(slot, committee_index,
|
||||
ApiStrategyKind.Best)
|
||||
|
||||
let pendingAttestations =
|
||||
block:
|
||||
var res: seq[Future[bool]]
|
||||
for duty in duties:
|
||||
debug "Serving attestation duty", duty = duty.data, epoch = slot.epoch()
|
||||
if (duty.data.slot != ad.slot) or
|
||||
(uint64(duty.data.committee_index) != ad.index):
|
||||
warn "Inconsistent validator duties during attestation signing",
|
||||
validator = shortLog(duty.data.pubkey),
|
||||
duty_slot = duty.data.slot,
|
||||
duty_index = duty.data.committee_index,
|
||||
attestation_slot = ad.slot, attestation_index = ad.index
|
||||
continue
|
||||
res.add(service.serveAttestation(ad, duty))
|
||||
res
|
||||
let registeredRes = vc.attachedValidators[].slashingProtection.withContext:
|
||||
var tmp: seq[RegisteredAttestation]
|
||||
for duty in duties:
|
||||
if (duty.data.slot != data.slot) or
|
||||
(uint64(duty.data.committee_index) != data.index):
|
||||
warn "Inconsistent validator duties during attestation signing",
|
||||
validator = shortLog(duty.data.pubkey),
|
||||
duty_slot = duty.data.slot,
|
||||
duty_index = duty.data.committee_index,
|
||||
attestation_slot = data.slot, attestation_index = data.index
|
||||
continue
|
||||
|
||||
let statistics =
|
||||
block:
|
||||
var errored, succeed, failed = 0
|
||||
try:
|
||||
await allFutures(pendingAttestations)
|
||||
except CancelledError as exc:
|
||||
let pending = pendingAttestations
|
||||
.filterIt(not(it.finished())).mapIt(it.cancelAndWait())
|
||||
await noCancel allFutures(pending)
|
||||
raise exc
|
||||
let validator = vc.getValidatorForDuties(
|
||||
duty.data.pubkey, duty.data.slot).valueOr:
|
||||
continue
|
||||
|
||||
for future in pendingAttestations:
|
||||
if future.completed():
|
||||
if future.read():
|
||||
inc(succeed)
|
||||
else:
|
||||
inc(failed)
|
||||
else:
|
||||
inc(errored)
|
||||
(succeed, errored, failed)
|
||||
doAssert(validator.index.isSome())
|
||||
let validator_index = validator.index.get()
|
||||
|
||||
let delay = vc.getDelay(slot.attestation_deadline())
|
||||
debug "Attestation statistics", total = len(pendingAttestations),
|
||||
succeed = statistics[0], failed_to_deliver = statistics[1],
|
||||
not_accepted = statistics[2], delay = delay, slot = slot,
|
||||
committee_index = committee_index, duties_count = len(duties)
|
||||
logScope:
|
||||
validator = validatorLog(validator)
|
||||
|
||||
return ad
|
||||
# TODO: signing_root is recomputed in getAttestationSignature just after,
|
||||
# but not for locally attached validators.
|
||||
let
|
||||
signingRoot = compute_attestation_signing_root(
|
||||
fork, vc.beaconGenesis.genesis_validators_root, data)
|
||||
registered = registerAttestationInContext(
|
||||
validator_index, validator.pubkey, data.source.epoch,
|
||||
data.target.epoch, signingRoot)
|
||||
if registered.isErr():
|
||||
warn "Slashing protection activated for attestation",
|
||||
attestationData = shortLog(data),
|
||||
signingRoot = shortLog(signingRoot),
|
||||
badVoteDetails = $registered.error()
|
||||
continue
|
||||
|
||||
tmp.add(RegisteredAttestation(
|
||||
validator: validator,
|
||||
index_in_committee: duty.data.validator_committee_index,
|
||||
committee_len: int duty.data.committee_length,
|
||||
data: data
|
||||
))
|
||||
tmp
|
||||
|
||||
if registeredRes.isErr():
|
||||
warn "Could not update slashing database, skipping attestation duties",
|
||||
error = registeredRes.error()
|
||||
else:
|
||||
let
|
||||
pendingAttestations = registeredRes[].mapIt(service.serveAttestation(it))
|
||||
statistics =
|
||||
block:
|
||||
var errored, succeed, failed = 0
|
||||
try:
|
||||
await allFutures(pendingAttestations)
|
||||
except CancelledError as exc:
|
||||
let pending = pendingAttestations
|
||||
.filterIt(not(it.finished())).mapIt(it.cancelAndWait())
|
||||
await noCancel allFutures(pending)
|
||||
raise exc
|
||||
|
||||
for future in pendingAttestations:
|
||||
if future.completed():
|
||||
if future.read():
|
||||
inc(succeed)
|
||||
else:
|
||||
inc(failed)
|
||||
else:
|
||||
inc(errored)
|
||||
(succeed, errored, failed)
|
||||
|
||||
let delay = vc.getDelay(slot.attestation_deadline())
|
||||
debug "Attestation statistics", total = len(pendingAttestations),
|
||||
succeed = statistics[0], failed_to_deliver = statistics[1],
|
||||
not_accepted = statistics[2], delay = delay, slot = slot,
|
||||
committee_index = committee_index, duties_count = len(duties)
|
||||
|
||||
return data
|
||||
|
||||
proc produceAndPublishAggregates(service: AttestationServiceRef,
|
||||
adata: AttestationData,
|
||||
@ -329,8 +342,6 @@ proc publishAttestationsAndAggregates(service: AttestationServiceRef,
|
||||
committee_index: CommitteeIndex,
|
||||
duties: seq[DutyAndProof]) {.async.} =
|
||||
let vc = service.client
|
||||
# Waiting for blocks to be published before attesting.
|
||||
await vc.waitForBlock(slot, attestationSlotOffset)
|
||||
|
||||
block:
|
||||
let delay = vc.getDelay(slot.attestation_deadline())
|
||||
@ -378,6 +389,9 @@ proc spawnAttestationTasks(service: AttestationServiceRef,
|
||||
res.mgetOrPut(item.data.committee_index, default).add(item)
|
||||
res
|
||||
|
||||
# Waiting for blocks to be published before attesting.
|
||||
await vc.waitForBlock(slot, attestationSlotOffset)
|
||||
|
||||
var tasks: seq[Future[void]]
|
||||
try:
|
||||
for index, duties in dutiesByCommittee:
|
||||
|
@ -331,27 +331,23 @@ proc handleLightClientUpdates*(node: BeaconNode, slot: Slot) {.async.} =
|
||||
proc createAndSendAttestation(node: BeaconNode,
|
||||
fork: Fork,
|
||||
genesis_validators_root: Eth2Digest,
|
||||
validator: AttachedValidator,
|
||||
data: AttestationData,
|
||||
committeeLen: int,
|
||||
indexInCommittee: int,
|
||||
registered: RegisteredAttestation,
|
||||
subnet_id: SubnetId) {.async.} =
|
||||
try:
|
||||
let
|
||||
signature = block:
|
||||
let res = await validator.getAttestationSignature(
|
||||
fork, genesis_validators_root, data)
|
||||
let res = await registered.validator.getAttestationSignature(
|
||||
fork, genesis_validators_root, registered.data)
|
||||
if res.isErr():
|
||||
warn "Unable to sign attestation", validator = shortLog(validator),
|
||||
attestationData = shortLog(data), error_msg = res.error()
|
||||
warn "Unable to sign attestation",
|
||||
validator = shortLog(registered.validator),
|
||||
attestationData = shortLog(registered.data),
|
||||
error_msg = res.error()
|
||||
return
|
||||
res.get()
|
||||
attestation =
|
||||
Attestation.init(
|
||||
[uint64 indexInCommittee], committeeLen, data, signature).expect(
|
||||
"valid data")
|
||||
attestation = registered.toAttestation(signature)
|
||||
|
||||
validator.doppelgangerActivity(attestation.data.slot.epoch)
|
||||
registered.validator.doppelgangerActivity(attestation.data.slot.epoch)
|
||||
|
||||
# Logged in the router
|
||||
let res = await node.router.routeAttestation(
|
||||
@ -360,7 +356,9 @@ proc createAndSendAttestation(node: BeaconNode,
|
||||
return
|
||||
|
||||
if node.config.dumpEnabled:
|
||||
dump(node.config.dumpDirOutgoing, attestation.data, validator.pubkey)
|
||||
dump(
|
||||
node.config.dumpDirOutgoing, attestation.data,
|
||||
registered.validator.pubkey)
|
||||
except CatchableError as exc:
|
||||
# An error could happen here when the signature task fails - we must
|
||||
# not leak the exception because this is an asyncSpawn task
|
||||
@ -566,8 +564,8 @@ proc makeBeaconBlockForHeadAndSlot*(
|
||||
PayloadType: type ForkyExecutionPayloadForSigning, node: BeaconNode, randao_reveal: ValidatorSig,
|
||||
validator_index: ValidatorIndex, graffiti: GraffitiBytes, head: BlockRef,
|
||||
slot: Slot):
|
||||
Future[ForkedBlockResult] {.async.} =
|
||||
return await makeBeaconBlockForHeadAndSlot(
|
||||
Future[ForkedBlockResult] =
|
||||
return makeBeaconBlockForHeadAndSlot(
|
||||
PayloadType, node, randao_reveal, validator_index, graffiti, head, slot,
|
||||
execution_payload = Opt.none(PayloadType),
|
||||
transactions_root = Opt.none(Eth2Digest),
|
||||
@ -1245,41 +1243,51 @@ proc handleAttestations(node: BeaconNode, head: BlockRef, slot: Slot) =
|
||||
committees_per_slot = get_committee_count_per_slot(epochRef.shufflingRef)
|
||||
fork = node.dag.forkAtEpoch(slot.epoch)
|
||||
genesis_validators_root = node.dag.genesis_validators_root
|
||||
registeredRes = node.attachedValidators.slashingProtection.withContext:
|
||||
var tmp: seq[(RegisteredAttestation, SubnetId)]
|
||||
|
||||
for committee_index in get_committee_indices(committees_per_slot):
|
||||
let committee = get_beacon_committee(
|
||||
epochRef.shufflingRef, slot, committee_index)
|
||||
for committee_index in get_committee_indices(committees_per_slot):
|
||||
let
|
||||
committee = get_beacon_committee(
|
||||
epochRef.shufflingRef, slot, committee_index)
|
||||
subnet_id = compute_subnet_for_attestation(
|
||||
committees_per_slot, slot, committee_index)
|
||||
|
||||
for index_in_committee, validator_index in committee:
|
||||
let validator = node.getValidatorForDuties(validator_index, slot).valueOr:
|
||||
continue
|
||||
for index_in_committee, validator_index in committee:
|
||||
let validator = node.getValidatorForDuties(validator_index, slot).valueOr:
|
||||
continue
|
||||
|
||||
let
|
||||
data = makeAttestationData(epochRef, attestationHead, committee_index)
|
||||
# TODO signing_root is recomputed in produceAndSignAttestation/signAttestation just after
|
||||
signingRoot = compute_attestation_signing_root(
|
||||
fork, genesis_validators_root, data)
|
||||
registered = node.attachedValidators
|
||||
.slashingProtection
|
||||
.registerAttestation(
|
||||
validator_index,
|
||||
validator.pubkey,
|
||||
data.source.epoch,
|
||||
data.target.epoch,
|
||||
signingRoot)
|
||||
if registered.isOk():
|
||||
let subnet_id = compute_subnet_for_attestation(
|
||||
committees_per_slot, data.slot, committee_index)
|
||||
asyncSpawn createAndSendAttestation(
|
||||
node, fork, genesis_validators_root, validator, data,
|
||||
committee.len(), index_in_committee, subnet_id)
|
||||
else:
|
||||
warn "Slashing protection activated for attestation",
|
||||
attestationData = shortLog(data),
|
||||
signingRoot = shortLog(signingRoot),
|
||||
validator_index,
|
||||
validator = shortLog(validator),
|
||||
badVoteDetails = $registered.error()
|
||||
let
|
||||
data = makeAttestationData(epochRef, attestationHead, committee_index)
|
||||
# TODO signing_root is recomputed in produceAndSignAttestation/signAttestation just after
|
||||
signingRoot = compute_attestation_signing_root(
|
||||
fork, genesis_validators_root, data)
|
||||
registered = registerAttestationInContext(
|
||||
validator_index, validator.pubkey, data.source.epoch,
|
||||
data.target.epoch, signingRoot)
|
||||
if registered.isErr():
|
||||
warn "Slashing protection activated for attestation",
|
||||
attestationData = shortLog(data),
|
||||
signingRoot = shortLog(signingRoot),
|
||||
validator_index,
|
||||
validator = shortLog(validator),
|
||||
badVoteDetails = $registered.error()
|
||||
continue
|
||||
|
||||
tmp.add((RegisteredAttestation(
|
||||
validator: validator,
|
||||
index_in_committee: uint64 index_in_committee,
|
||||
committee_len: committee.len(), data: data), subnet_id
|
||||
))
|
||||
tmp
|
||||
|
||||
if registeredRes.isErr():
|
||||
warn "Could not update slashing database, skipping attestation duties",
|
||||
error = registeredRes.error()
|
||||
else:
|
||||
for attestation in registeredRes[]:
|
||||
asyncSpawn createAndSendAttestation(
|
||||
node, fork, genesis_validators_root, attestation[0], attestation[1])
|
||||
|
||||
proc createAndSendSyncCommitteeMessage(node: BeaconNode,
|
||||
validator: AttachedValidator,
|
||||
|
@ -154,7 +154,7 @@ proc checkSlashableBlockProposal*(
|
||||
## The error contains the blockroot that was already proposed
|
||||
##
|
||||
## Returns success otherwise
|
||||
checkSlashableBlockProposal(db.db_v2, some(index), validator, slot)
|
||||
checkSlashableBlockProposal(db.db_v2, Opt.some(index), validator, slot)
|
||||
|
||||
proc checkSlashableAttestation*(
|
||||
db: SlashingProtectionDB,
|
||||
@ -169,7 +169,7 @@ proc checkSlashableAttestation*(
|
||||
## (surrounding vote or surrounded vote).
|
||||
##
|
||||
## Returns success otherwise
|
||||
checkSlashableAttestation(db.db_v2, some(index), validator, source, target)
|
||||
checkSlashableAttestation(db.db_v2, Opt.some(index), validator, source, target)
|
||||
|
||||
# DB Updates - only v2 supported here
|
||||
# --------------------------------------------
|
||||
@ -184,7 +184,7 @@ proc registerBlock*(
|
||||
##
|
||||
## block_signing_root is the output of
|
||||
## compute_signing_root(block, domain)
|
||||
registerBlock(db.db_v2, some(index), validator, slot, block_signing_root)
|
||||
registerBlock(db.db_v2, Opt.some(index), validator, slot, block_signing_root)
|
||||
|
||||
proc registerAttestation*(
|
||||
db: SlashingProtectionDB,
|
||||
@ -197,9 +197,22 @@ proc registerAttestation*(
|
||||
##
|
||||
## attestation_signing_root is the output of
|
||||
## compute_signing_root(attestation, domain)
|
||||
registerAttestation(db.db_v2, some(index), validator,
|
||||
registerAttestation(db.db_v2, Opt.some(index), validator,
|
||||
source, target, attestation_signing_root)
|
||||
|
||||
template withContext*(db: SlashingProtectionDB, body: untyped): untyped =
|
||||
## Perform multiple slashing database operations within a single database
|
||||
## context
|
||||
db.db_v2.withContext:
|
||||
template registerAttestationInContext(
|
||||
index: ValidatorIndex,
|
||||
validator: ValidatorPubKey,
|
||||
source, target: Epoch,
|
||||
attestation_signing_root: Eth2Digest): Result[void, BadVote] =
|
||||
registerAttestationInContextV2(Opt.some(index), validator, source, target, attestation_signing_root)
|
||||
block:
|
||||
body
|
||||
|
||||
# DB maintenance
|
||||
# --------------------------------------------
|
||||
# private for now
|
||||
|
@ -9,9 +9,10 @@
|
||||
|
||||
import
|
||||
# Standard library
|
||||
std/[os, options, typetraits, decls, tables],
|
||||
std/[os, typetraits, decls, tables],
|
||||
# Status
|
||||
stew/byteutils,
|
||||
results,
|
||||
eth/db/[kvstore, kvstore_sqlite3],
|
||||
chronicles,
|
||||
sqlite3_abi,
|
||||
@ -20,6 +21,8 @@ import
|
||||
../spec/helpers,
|
||||
./slashing_protection_common
|
||||
|
||||
export results
|
||||
|
||||
# Requirements
|
||||
# --------------------------------------------
|
||||
#
|
||||
@ -609,7 +612,7 @@ func getRawDBHandle*(db: SlashingProtectionDB_v2): SqStoreRef =
|
||||
## Get the underlying raw DB handle
|
||||
db.backend
|
||||
|
||||
proc getMetadataTable_DbV2*(db: SlashingProtectionDB_v2): Option[Eth2Digest] =
|
||||
proc getMetadataTable_DbV2*(db: SlashingProtectionDB_v2): Opt[Eth2Digest] =
|
||||
## Check if the DB has v2 metadata
|
||||
## and get its genesis root
|
||||
let existenceStmt = db.backend.prepareStmt("""
|
||||
@ -630,9 +633,9 @@ proc getMetadataTable_DbV2*(db: SlashingProtectionDB_v2): Option[Eth2Digest] =
|
||||
|
||||
|
||||
if v2exists.isErr():
|
||||
return none(Eth2Digest)
|
||||
return Opt.none(Eth2Digest)
|
||||
elif hasV2 == 0:
|
||||
return none(Eth2Digest)
|
||||
return Opt.none(Eth2Digest)
|
||||
|
||||
let selectStmt = db.backend.prepareStmt(
|
||||
"SELECT * FROM metadata;",
|
||||
@ -655,9 +658,9 @@ proc getMetadataTable_DbV2*(db: SlashingProtectionDB_v2): Option[Eth2Digest] =
|
||||
found = version,
|
||||
expected = db.typeof.version()
|
||||
quit 1
|
||||
return some(root)
|
||||
return Opt.some(root)
|
||||
else:
|
||||
return none(Eth2Digest)
|
||||
return Opt.none(Eth2Digest)
|
||||
|
||||
proc initCompatV1*(
|
||||
T: type SlashingProtectionDB_v2,
|
||||
@ -773,8 +776,8 @@ proc foundAnyResult(status: KvResult[bool]): bool {.inline.}=
|
||||
|
||||
proc getValidatorInternalID(
|
||||
db: SlashingProtectionDB_v2,
|
||||
index: Option[ValidatorIndex],
|
||||
validator: ValidatorPubKey): Option[ValidatorInternalID] =
|
||||
index: Opt[ValidatorIndex],
|
||||
validator: ValidatorPubKey): Opt[ValidatorInternalID] =
|
||||
## Retrieve a validator internal ID
|
||||
if index.isSome():
|
||||
# Validator keys are mapped to internal id:s instead of using the
|
||||
@ -785,7 +788,7 @@ proc getValidatorInternalID(
|
||||
# validator index. In the meantime, this cache avoids some of the
|
||||
# unnecessary read traffic when checking and registering entries.
|
||||
db.internalIds.withValue(index.get(), internal) do:
|
||||
return some(internal[])
|
||||
return Opt.some(internal[])
|
||||
|
||||
let serializedPubkey = validator.toRaw() # Miracl/BLST to bytes
|
||||
var valID: ValidatorInternalID
|
||||
@ -796,9 +799,9 @@ proc getValidatorInternalID(
|
||||
if status.foundAnyResult():
|
||||
if index.isSome():
|
||||
db.internalIds[index.get()] = valID
|
||||
some(valID)
|
||||
Opt.some(valID)
|
||||
else:
|
||||
none(ValidatorInternalID)
|
||||
Opt.none(ValidatorInternalID)
|
||||
|
||||
proc checkSlashableBlockProposalOther(
|
||||
db: SlashingProtectionDB_v2,
|
||||
@ -887,7 +890,7 @@ proc checkSlashableBlockProposalDoubleProposal(
|
||||
|
||||
proc checkSlashableBlockProposal*(
|
||||
db: SlashingProtectionDB_v2,
|
||||
index: Option[ValidatorIndex],
|
||||
index: Opt[ValidatorIndex],
|
||||
validator: ValidatorPubKey,
|
||||
slot: Slot
|
||||
): Result[void, BadProposal] =
|
||||
@ -1049,7 +1052,7 @@ proc checkSlashableAttestationOther(
|
||||
|
||||
proc checkSlashableAttestation*(
|
||||
db: SlashingProtectionDB_v2,
|
||||
index: Option[ValidatorIndex],
|
||||
index: Opt[ValidatorIndex],
|
||||
validator: ValidatorPubKey,
|
||||
source: Epoch,
|
||||
target: Epoch
|
||||
@ -1084,7 +1087,7 @@ proc registerValidator(db: SlashingProtectionDB_v2, validator: ValidatorPubKey)
|
||||
|
||||
proc getOrRegisterValidator(
|
||||
db: SlashingProtectionDB_v2,
|
||||
index: Option[ValidatorIndex],
|
||||
index: Opt[ValidatorIndex],
|
||||
validator: ValidatorPubKey): ValidatorInternalID =
|
||||
## Get validator from the database
|
||||
## or register it and then return it
|
||||
@ -1102,7 +1105,7 @@ proc getOrRegisterValidator(
|
||||
|
||||
proc registerBlock*(
|
||||
db: SlashingProtectionDB_v2,
|
||||
index: Option[ValidatorIndex],
|
||||
index: Opt[ValidatorIndex],
|
||||
validator: ValidatorPubKey,
|
||||
slot: Slot, block_root: Eth2Digest): Result[void, BadProposal] =
|
||||
## Add a block to the slashing protection DB
|
||||
@ -1138,11 +1141,11 @@ proc registerBlock*(
|
||||
db: SlashingProtectionDB_v2,
|
||||
validator: ValidatorPubKey,
|
||||
slot: Slot, block_root: Eth2Digest): Result[void, BadProposal] =
|
||||
registerBlock(db, none(ValidatorIndex), validator, slot, block_root)
|
||||
registerBlock(db, Opt.none(ValidatorIndex), validator, slot, block_root)
|
||||
|
||||
proc registerAttestation*(
|
||||
db: SlashingProtectionDB_v2,
|
||||
index: Option[ValidatorIndex],
|
||||
index: Opt[ValidatorIndex],
|
||||
validator: ValidatorPubKey,
|
||||
source, target: Epoch,
|
||||
attestation_root: Eth2Digest): Result[void, BadVote] =
|
||||
@ -1187,13 +1190,52 @@ proc registerAttestation*(
|
||||
source, target: Epoch,
|
||||
attestation_root: Eth2Digest): Result[void, BadVote] =
|
||||
registerAttestation(
|
||||
db, none(ValidatorIndex), validator, source, target, attestation_root)
|
||||
db, Opt.none(ValidatorIndex), validator, source, target, attestation_root)
|
||||
|
||||
template withContext*(dbParam: SlashingProtectionDB_v2, body: untyped): untyped =
|
||||
let
|
||||
db = dbParam
|
||||
|
||||
template registerAttestationInContextV2(
|
||||
index: Opt[ValidatorIndex],
|
||||
validator: ValidatorPubKey,
|
||||
source, target: Epoch,
|
||||
signing_root: Eth2Digest): Result[void, BadVote] =
|
||||
registerAttestation(db, index, validator, source, target, signing_root)
|
||||
|
||||
var
|
||||
commit = false
|
||||
res: Result[typeof(body), string]
|
||||
beginRes = db.backend.exec("BEGIN TRANSACTION;")
|
||||
if beginRes.isErr(): # always lovely handling errors in templates
|
||||
res.err(beginRes.error())
|
||||
else:
|
||||
try:
|
||||
when type(body) is void:
|
||||
body
|
||||
commit = true
|
||||
else:
|
||||
res.ok(body)
|
||||
commit = true
|
||||
finally:
|
||||
if commit:
|
||||
let commit = db.backend.exec("COMMIT TRANSACTION;")
|
||||
if commit.isErr:
|
||||
res.err(commit.error())
|
||||
else:
|
||||
# Exception was raised from body - catch/reraise would cause the wrong
|
||||
# exception effect..
|
||||
if isInsideTransaction(db.backend): # calls `sqlite3_get_autocommit`
|
||||
discard db.backend.exec("ROLLBACK TRANSACTION;")
|
||||
res.err("Rolled back")
|
||||
|
||||
res
|
||||
|
||||
# DB maintenance
|
||||
# --------------------------------------------
|
||||
proc pruneBlocks*(
|
||||
db: SlashingProtectionDB_v2,
|
||||
index: Option[ValidatorIndex],
|
||||
index: Opt[ValidatorIndex],
|
||||
validator: ValidatorPubKey, newMinSlot: Slot) =
|
||||
## Prune all blocks from a validator before the specified newMinSlot
|
||||
## This is intended for interchange import to ensure
|
||||
@ -1208,11 +1250,11 @@ proc pruneBlocks*(
|
||||
proc pruneBlocks*(
|
||||
db: SlashingProtectionDB_v2,
|
||||
validator: ValidatorPubKey, newMinSlot: Slot) =
|
||||
pruneBlocks(db, none(ValidatorIndex), validator, newMinSlot)
|
||||
pruneBlocks(db, Opt.none(ValidatorIndex), validator, newMinSlot)
|
||||
|
||||
proc pruneAttestations*(
|
||||
db: SlashingProtectionDB_v2,
|
||||
index: Option[ValidatorIndex],
|
||||
index: Opt[ValidatorIndex],
|
||||
validator: ValidatorPubKey,
|
||||
newMinSourceEpoch: int64,
|
||||
newMinTargetEpoch: int64) =
|
||||
@ -1236,7 +1278,7 @@ proc pruneAttestations*(
|
||||
newMinSourceEpoch: int64,
|
||||
newMinTargetEpoch: int64) =
|
||||
pruneAttestations(
|
||||
db, none(ValidatorIndex), validator, newMinSourceEpoch, newMinTargetEpoch)
|
||||
db, Opt.none(ValidatorIndex), validator, newMinSourceEpoch, newMinTargetEpoch)
|
||||
|
||||
proc pruneAfterFinalization*(
|
||||
db: SlashingProtectionDB_v2,
|
||||
@ -1274,11 +1316,11 @@ proc retrieveLatestValidatorData*(
|
||||
db: SlashingProtectionDB_v2,
|
||||
validator: ValidatorPubKey
|
||||
): tuple[
|
||||
maxBlockSlot: Option[Slot],
|
||||
maxAttSourceEpoch: Option[Epoch],
|
||||
maxAttTargetEpoch: Option[Epoch]] =
|
||||
maxBlockSlot: Opt[Slot],
|
||||
maxAttSourceEpoch: Opt[Epoch],
|
||||
maxAttTargetEpoch: Opt[Epoch]] =
|
||||
|
||||
let valID = db.getOrRegisterValidator(none(ValidatorIndex), validator)
|
||||
let valID = db.getOrRegisterValidator(Opt.none(ValidatorIndex), validator)
|
||||
|
||||
var slot, source, target: int64
|
||||
let status = db.sqlMaxBlockAtt.exec(
|
||||
@ -1299,11 +1341,11 @@ proc retrieveLatestValidatorData*(
|
||||
# but let's deal with those here
|
||||
|
||||
if slot != 0:
|
||||
result.maxBlockSlot = some(Slot slot)
|
||||
result.maxBlockSlot = Opt.some(Slot slot)
|
||||
if source != 0:
|
||||
result.maxAttSourceEpoch = some(Epoch source)
|
||||
result.maxAttSourceEpoch = Opt.some(Epoch source)
|
||||
if target != 0:
|
||||
result.maxAttTargetEpoch = some(Epoch target)
|
||||
result.maxAttTargetEpoch = Opt.some(Epoch target)
|
||||
|
||||
proc registerSyntheticAttestation*(
|
||||
db: SlashingProtectionDB_v2,
|
||||
@ -1314,7 +1356,7 @@ proc registerSyntheticAttestation*(
|
||||
# Spec require source < target (except genesis?), for synthetic attestation for slashing protection we want max(source, target)
|
||||
doAssert (source < target) or (source == Epoch(0) and target == Epoch(0))
|
||||
|
||||
let valID = db.getOrRegisterValidator(none(ValidatorIndex), validator)
|
||||
let valID = db.getOrRegisterValidator(Opt.none(ValidatorIndex), validator)
|
||||
|
||||
# Overflows in 14 trillion years (minimal) or 112 trillion years (mainnet)
|
||||
doAssert source <= high(int64).uint64
|
||||
|
@ -11,12 +11,29 @@ import
|
||||
chronos,
|
||||
results,
|
||||
../consensus_object_pools/block_dag,
|
||||
../beacon_clock
|
||||
../beacon_clock,
|
||||
"."/[validator_pool]
|
||||
|
||||
export chronos, results, block_dag, beacon_clock
|
||||
|
||||
# The validator_duties module contains logic and utilities related to performing
|
||||
# validator duties that are shared between beacon node and validator client.
|
||||
## The validator_duties module contains logic and utilities related to performing
|
||||
## validator duties that are shared between beacon node and validator client.
|
||||
|
||||
type
|
||||
RegisteredAttestation* = object
|
||||
# A registered attestation is one that has been successfully registered in
|
||||
# the slashing protection database and is therefore ready to be signed and
|
||||
# sent
|
||||
validator*: AttachedValidator
|
||||
index_in_committee*: uint64
|
||||
committee_len*: int
|
||||
data*: AttestationData
|
||||
|
||||
proc toAttestation*(
|
||||
registered: RegisteredAttestation, signature: ValidatorSig): Attestation =
|
||||
Attestation.init(
|
||||
[registered.index_in_committee], registered.committee_len,
|
||||
registered.data, signature).expect("valid data")
|
||||
|
||||
proc waitAfterBlockCutoff*(clock: BeaconClock, slot: Slot,
|
||||
head: Opt[BlockRef] = Opt.none(BlockRef)) {.async.} =
|
||||
|
@ -1,5 +1,5 @@
|
||||
# Nimbus
|
||||
# Copyright (c) 2018-2022 Status Research & Development GmbH
|
||||
# Copyright (c) 2018-2023 Status Research & Development GmbH
|
||||
# Licensed under either of
|
||||
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or https://www.apache.org/licenses/LICENSE-2.0)
|
||||
# * MIT license ([LICENSE-MIT](LICENSE-MIT) or https://opensource.org/licenses/MIT)
|
||||
@ -182,7 +182,8 @@ proc runTest(identifier: string) =
|
||||
|
||||
for blck in step.blocks:
|
||||
let pubkey = ValidatorPubKey.fromRaw(blck.pubkey.PubKeyBytes).get()
|
||||
let status = db.db_v2.checkSlashableBlockProposal(none(ValidatorIndex),
|
||||
let status = db.db_v2.checkSlashableBlockProposal(
|
||||
Opt.none(ValidatorIndex),
|
||||
pubkey,
|
||||
Slot blck.slot
|
||||
)
|
||||
@ -196,7 +197,7 @@ proc runTest(identifier: string) =
|
||||
# Successful blocks are to be incoporated in the DB
|
||||
if status.isOk(): # Skip duplicates
|
||||
let status = db.db_v2.registerBlock(
|
||||
none(ValidatorIndex),
|
||||
Opt.none(ValidatorIndex),
|
||||
pubkey, Slot blck.slot,
|
||||
Eth2Digest blck.signing_root
|
||||
)
|
||||
@ -212,7 +213,7 @@ proc runTest(identifier: string) =
|
||||
for att in step.attestations:
|
||||
let pubkey = ValidatorPubKey.fromRaw(att.pubkey.PubKeyBytes).get()
|
||||
|
||||
let status = db.db_v2.checkSlashableAttestation(none(ValidatorIndex),
|
||||
let status = db.db_v2.checkSlashableAttestation(Opt.none(ValidatorIndex),
|
||||
pubkey,
|
||||
Epoch att.source_epoch,
|
||||
Epoch att.target_epoch
|
||||
@ -227,7 +228,7 @@ proc runTest(identifier: string) =
|
||||
# Successful attestations are to be incoporated in the DB
|
||||
if status.isOk(): # Skip duplicates
|
||||
let status = db.db_v2.registerAttestation(
|
||||
none(ValidatorIndex),
|
||||
Opt.none(ValidatorIndex),
|
||||
pubkey,
|
||||
Epoch att.source_epoch,
|
||||
Epoch att.target_epoch,
|
||||
|
Loading…
x
Reference in New Issue
Block a user