nimbus-eth2/beacon_chain/validator_client/sync_committee_service.nim
Jacek Sieka 83f9745df1
restore doppelganger check on connectivity loss (#4616)
* restore doppelganger check on connectivity loss

https://github.com/status-im/nimbus-eth2/pull/4398 introduced a
regression in functionality where doppelganger detection would not be
rerun during connectivity loss. This PR reintroduces this check and
makes some adjustments to the implementation to simplify the code flow
for both BN and VC.

* track when check was last performed for each validator (to deal with
late-added validators)
* track when we performed a doppel-detectable activity (attesting) so as
to avoid false positives
* remove nodeStart special case (this should be treated the same as
adding a validator dynamically just after startup)

* allow sync committee duties in doppelganger period

* don't trigger doppelganger when registering duties

* fix crash when expected index response is missing

* fix missing slashingSafe propagation
2023-02-20 13:28:56 +02:00

456 lines
16 KiB
Nim

# beacon_chain
# Copyright (c) 2022 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.
import
std/sets,
metrics, chronicles,
../spec/datatypes/[phase0, altair, bellatrix],
../spec/eth2_apis/rest_types,
../validators/activity_metrics,
"."/[common, api, block_service]
const
ServiceName = "sync_committee_service"
logScope: service = ServiceName
type
ContributionItem* = object
aggregator_index: uint64
selection_proof: ValidatorSig
validator: AttachedValidator
subcommitteeIdx: SyncSubcommitteeIndex
proc serveSyncCommitteeMessage*(service: SyncCommitteeServiceRef,
slot: Slot, beaconBlockRoot: Eth2Digest,
duty: SyncCommitteeDuty): Future[bool] {.
async.} =
let
vc = service.client
fork = vc.forkAtEpoch(slot.epoch)
genesisValidatorsRoot = vc.beaconGenesis.genesis_validators_root
vindex = duty.validator_index
subcommitteeIdx = getSubcommitteeIndex(
duty.validator_sync_committee_index)
validator = vc.getValidatorForDuties(
duty.pubkey, slot, slashingSafe = true).valueOr: return false
message =
block:
let res = await getSyncCommitteeMessage(validator, fork,
genesisValidatorsRoot,
slot, beaconBlockRoot)
if res.isErr():
error "Unable to sign committee message using remote signer",
validator = shortLog(validator), slot = slot,
block_root = shortLog(beaconBlockRoot)
return
res.get()
debug "Sending sync committee message", message = shortLog(message),
validator = shortLog(validator), validator_index = vindex,
delay = vc.getDelay(message.slot.sync_committee_message_deadline())
let res =
try:
await vc.submitPoolSyncCommitteeSignature(message, ApiStrategyKind.First)
except ValidatorApiError:
error "Unable to publish sync committee message",
message = shortLog(message),
validator = shortLog(validator),
validator_index = vindex
return false
except CancelledError:
debug "Publish sync committee message request was interrupted"
return false
except CatchableError as exc:
error "Unexpected error occurred while publishing sync committee message",
message = shortLog(message),
validator = shortLog(validator),
validator_index = vindex,
err_name = exc.name, err_msg = exc.msg
return false
let delay = vc.getDelay(message.slot.sync_committee_message_deadline())
if res:
beacon_sync_committee_messages_sent.inc()
beacon_sync_committee_message_sent_delay.observe(delay.toFloatSeconds())
notice "Sync committee message published",
message = shortLog(message),
validator = shortLog(validator),
validator_index = vindex,
delay = delay
else:
warn "Sync committee message was not accepted by beacon node",
message = shortLog(message),
validator = shortLog(validator),
validator_index = vindex, delay = delay
return res
proc produceAndPublishSyncCommitteeMessages(service: SyncCommitteeServiceRef,
slot: Slot,
beaconBlockRoot: Eth2Digest,
duties: seq[SyncCommitteeDuty])
{.async.} =
let vc = service.client
let pendingSyncCommitteeMessages =
block:
var res: seq[Future[bool]]
for duty in duties:
debug "Serving sync message duty", duty, epoch = slot.epoch()
res.add(service.serveSyncCommitteeMessage(slot,
beaconBlockRoot,
duty))
res
let statistics =
block:
var errored, succeed, failed = 0
try:
await allFutures(pendingSyncCommitteeMessages)
except CancelledError as exc:
for fut in pendingSyncCommitteeMessages:
if not(fut.finished()):
fut.cancel()
await allFutures(pendingSyncCommitteeMessages)
raise exc
for future in pendingSyncCommitteeMessages:
if future.done():
if future.read():
inc(succeed)
else:
inc(failed)
else:
inc(errored)
(succeed, errored, failed)
let delay = vc.getDelay(slot.attestation_deadline())
debug "Sync committee message statistics",
total = len(pendingSyncCommitteeMessages),
succeed = statistics[0], failed_to_deliver = statistics[1],
not_accepted = statistics[2], delay = delay, slot = slot,
duties_count = len(duties)
proc serveContributionAndProof*(service: SyncCommitteeServiceRef,
proof: ContributionAndProof,
validator: AttachedValidator): Future[bool] {.
async.} =
let
vc = service.client
slot = proof.contribution.slot
validatorIdx = validator.index.get()
genesisRoot = vc.beaconGenesis.genesis_validators_root
fork = vc.forkAtEpoch(slot.epoch)
let signature =
block:
let res = await validator.getContributionAndProofSignature(
fork, genesisRoot, proof)
if res.isErr():
error "Unable to sign sync committee contribution using remote signer",
validator = shortLog(validator),
contribution = shortLog(proof.contribution),
error_msg = res.error()
return false
res.get()
debug "Sending sync contribution",
contribution = shortLog(proof.contribution),
validator = shortLog(validator), validator_index = validatorIdx,
delay = vc.getDelay(slot.sync_contribution_deadline())
let restSignedProof = RestSignedContributionAndProof.init(
proof, signature)
let res =
try:
await vc.publishContributionAndProofs(@[restSignedProof],
ApiStrategyKind.First)
except ValidatorApiError as err:
error "Unable to publish sync contribution",
contribution = shortLog(proof.contribution),
validator = shortLog(validator),
validator_index = validatorIdx,
err_msg = err.msg
false
except CancelledError:
debug "Publish sync contribution request was interrupted"
return false
except CatchableError as err:
error "Unexpected error occurred while publishing sync contribution",
contribution = shortLog(proof.contribution),
validator = shortLog(validator),
err_name = err.name, err_msg = err.msg
false
if res:
beacon_sync_committee_contributions_sent.inc()
notice "Sync contribution published",
validator = shortLog(validator),
validator_index = validatorIdx
else:
warn "Sync contribution was not accepted by beacon node",
contribution = shortLog(proof.contribution),
validator = shortLog(validator),
validator_index = validatorIdx
return res
proc produceAndPublishContributions(service: SyncCommitteeServiceRef,
slot: Slot,
beaconBlockRoot: Eth2Digest,
duties: seq[SyncCommitteeDuty]) {.async.} =
let
vc = service.client
epoch = slot.epoch
fork = vc.forkAtEpoch(epoch)
var slotSignatureReqs: seq[Future[SignatureResult]]
var validators: seq[(AttachedValidator, SyncSubcommitteeIndex)]
for duty in duties:
let
validator = vc.getValidatorForDuties(duty.pubkey, slot).valueOr:
continue
subCommitteeIdx =
getSubcommitteeIndex(duty.validator_sync_committee_index)
future = validator.getSyncCommitteeSelectionProof(
fork,
vc.beaconGenesis.genesis_validators_root,
slot,
subCommitteeIdx)
slotSignatureReqs.add(future)
validators.add((validator, subCommitteeIdx))
try:
await allFutures(slotSignatureReqs)
except CancelledError as exc:
var pendingCancel: seq[Future[void]]
for future in slotSignatureReqs:
if not(future.finished()):
pendingCancel.add(future.cancelAndWait())
await allFutures(pendingCancel)
raise exc
var
contributionsFuts: array[SYNC_COMMITTEE_SUBNET_COUNT,
Future[SyncCommitteeContribution]]
let validatorContributions = block:
var res: seq[ContributionItem]
for idx, fut in slotSignatureReqs:
if fut.done:
let
sigRes = fut.read
validator = validators[idx][0]
subCommitteeIdx = validators[idx][1]
if sigRes.isErr:
error "Unable to create slot signature using remote signer",
validator = shortLog(validator),
error_msg = sigRes.error()
elif validator.index.isSome and
is_sync_committee_aggregator(sigRes.get):
res.add ContributionItem(
aggregator_index: uint64(validator.index.get),
selection_proof: sigRes.get,
validator: validator,
subcommitteeIdx: subCommitteeIdx)
if isNil(contributionsFuts[subCommitteeIdx]):
contributionsFuts[int subCommitteeIdx] =
vc.produceSyncCommitteeContribution(
slot,
subCommitteeIdx,
beaconBlockRoot,
ApiStrategyKind.Best)
res
if len(validatorContributions) > 0:
let pendingAggregates =
block:
var res: seq[Future[bool]]
for item in validatorContributions:
let aggContribution =
try:
await contributionsFuts[item.subcommitteeIdx]
except ValidatorApiError:
error "Unable to get sync message contribution data", slot = slot,
beaconBlockRoot = shortLog(beaconBlockRoot)
return
except CancelledError:
debug "Request for sync message contribution was interrupted"
return
except CatchableError as exc:
error "Unexpected error occurred while getting sync message "&
"contribution", slot = slot,
beaconBlockRoot = shortLog(beaconBlockRoot),
err_name = exc.name, err_msg = exc.msg
return
let proof = ContributionAndProof(
aggregator_index: item.aggregator_index,
contribution: aggContribution,
selection_proof: item.selection_proof
)
res.add(service.serveContributionAndProof(proof, item.validator))
res
let statistics =
block:
var errored, succeed, failed = 0
try:
await allFutures(pendingAggregates)
except CancelledError as err:
for fut in pendingAggregates:
if not(fut.finished()):
fut.cancel()
await allFutures(pendingAggregates)
raise err
for future in pendingAggregates:
if future.done():
if future.read():
inc(succeed)
else:
inc(failed)
else:
inc(errored)
(succeed, errored, failed)
let delay = vc.getDelay(slot.aggregate_deadline())
debug "Sync message contribution statistics",
total = len(pendingAggregates),
succeed = statistics[0], failed_to_deliver = statistics[1],
not_accepted = statistics[2], delay = delay, slot = slot
else:
debug "No contribution and proofs scheduled for slot", slot = slot
proc publishSyncMessagesAndContributions(service: SyncCommitteeServiceRef,
slot: Slot,
duties: seq[SyncCommitteeDuty]) {.
async.} =
let vc = service.client
await vc.waitForBlockPublished(slot, syncCommitteeMessageSlotOffset)
block:
let delay = vc.getDelay(slot.sync_committee_message_deadline())
debug "Producing sync committee messages", delay = delay, slot = slot,
duties_count = len(duties)
let beaconBlockRoot =
block:
try:
let res = await vc.getHeadBlockRoot(ApiStrategyKind.First)
if res.execution_optimistic.isNone():
## The `execution_optimistic` is missing from the response, we assume
## that the BN is unaware optimistic sync, so we consider the BN
## to be synchronized with the network.
## TODO (cheatfate): This should be removed when VC will be able to
## handle getSpec() API call with fork constants.
res.data.root
else:
if res.execution_optimistic.get():
error "Could not obtain head block's root because beacon node " &
"only optimistically synced", slot = slot
return
res.data.root
except ValidatorApiError as exc:
error "Unable to retrieve head block's root to sign", reason = exc.msg
return
except CancelledError:
debug "Block root request was interrupted"
return
except CatchableError as exc:
error "Unexpected error while requesting sync message block root",
err_name = exc.name, err_msg = exc.msg, slot = slot
return
try:
await service.produceAndPublishSyncCommitteeMessages(slot,
beaconBlockRoot,
duties)
except ValidatorApiError:
error "Unable to proceed sync committee messages", slot = slot,
duties_count = len(duties)
return
except CancelledError:
debug "Sync committee producing process was interrupted"
return
except CatchableError as exc:
error "Unexpected error while producing sync committee messages",
slot = slot,
duties_count = len(duties),
err_name = exc.name, err_msg = exc.msg
return
let contributionTime =
# chronos.Duration subtraction cannot return a negative value; in such
# case it will return `ZeroDuration`.
vc.beaconClock.durationToNextSlot() - OneThirdDuration
if contributionTime != ZeroDuration:
await sleepAsync(contributionTime)
block:
let delay = vc.getDelay(slot.sync_contribution_deadline())
debug "Producing contribution and proofs", delay = delay
await service.produceAndPublishContributions(slot, beaconBlockRoot, duties)
proc spawnSyncCommitteeTasks(service: SyncCommitteeServiceRef, slot: Slot) =
let
vc = service.client
duties = vc.getSyncCommitteeDutiesForSlot(slot + 1)
asyncSpawn service.publishSyncMessagesAndContributions(slot, duties)
proc mainLoop(service: SyncCommitteeServiceRef) {.async.} =
let vc = service.client
service.state = ServiceState.Running
debug "Service started"
debug "Sync committee duties loop waiting for fork schedule update"
await vc.forksAvailable.wait()
while true:
# This loop could look much more nicer/better, when
# https://github.com/nim-lang/Nim/issues/19911 will be fixed, so it could
# become safe to combine loops, breaks and exception handlers.
let breakLoop =
try:
let sleepTime =
syncCommitteeMessageSlotOffset + vc.beaconClock.durationToNextSlot()
let sres = vc.getCurrentSlot()
if sres.isSome():
let currentSlot = sres.get()
service.spawnSyncCommitteeTasks(currentSlot)
await sleepAsync(sleepTime)
false
except CancelledError:
debug "Service interrupted"
true
except CatchableError as exc:
warn "Service crashed with unexpected error", err_name = exc.name,
err_msg = exc.msg
true
if breakLoop:
break
proc init*(t: typedesc[SyncCommitteeServiceRef],
vc: ValidatorClientRef): Future[SyncCommitteeServiceRef] {.async.} =
logScope: service = ServiceName
let res = SyncCommitteeServiceRef(name: ServiceName, client: vc,
state: ServiceState.Initialized)
debug "Initializing service"
return res
proc start*(service: SyncCommitteeServiceRef) =
service.lifeFut = mainLoop(service)