VC: Fix doppelganger protection never allow attestations. (#4236)
* Fix doppelganger protection reorders validator indices in response issue. * Add chronos metrics endpoint to nimbus REST API. * Doppelganger protection now works on duties not on attestations. Improve logging for doppelganger and indices. * Improve doppelganger and indices logging. * Add number of validators to logs. * Move logging dumps from `debug` to `trace` level.
This commit is contained in:
parent
fb983f867f
commit
805a12e467
|
@ -171,6 +171,7 @@ proc onSlotStart(vc: ValidatorClientRef, wallTime: BeaconTime,
|
|||
slot = shortLog(wallSlot.slot),
|
||||
attestationIn = vc.getDurationToNextAttestation(wallSlot.slot),
|
||||
blockIn = vc.getDurationToNextBlock(wallSlot.slot),
|
||||
validators = vc.attachedValidators[].count(),
|
||||
delay = shortLog(delay)
|
||||
|
||||
return false
|
||||
|
|
|
@ -112,6 +112,8 @@ const
|
|||
"Invalid validator's index value(s)"
|
||||
EmptyValidatorIndexArrayError* =
|
||||
"Empty validator's index array"
|
||||
DuplicateValidatorIndexArrayError* =
|
||||
"Duplicate validator index found in array"
|
||||
InvalidSubscriptionRequestValueError* =
|
||||
"Invalid subscription request object(s)"
|
||||
ValidatorNotFoundError* =
|
||||
|
|
|
@ -51,6 +51,19 @@ type
|
|||
line*: int
|
||||
state*: string
|
||||
|
||||
RestChronosMetricsInfo* = object
|
||||
tcp_transports*: uint64
|
||||
udp_transports*: uint64
|
||||
tcp_servers*: uint64
|
||||
stream_readers*: uint64
|
||||
stream_writers*: uint64
|
||||
http_client_connections*: uint64
|
||||
http_client_requests*: uint64
|
||||
http_client_responses*: uint64
|
||||
http_server_connections*: uint64
|
||||
http_body_readers*: uint64
|
||||
http_body_writers*: uint64
|
||||
|
||||
RestPubSubPeer* = object
|
||||
peerId*: PeerId
|
||||
score*: float64
|
||||
|
@ -264,6 +277,33 @@ proc installNimbusApiHandlers*(router: var RestRouter, node: BeaconNode) =
|
|||
return RestApiResponse.jsonError(Http503,
|
||||
"Compile with '-d:chronosFutureTracking' to get this request working")
|
||||
|
||||
router.api(MethodGet, "/nimbus/v1/debug/chronos/metrics") do (
|
||||
) -> RestApiResponse:
|
||||
|
||||
template getCount(ttype: untyped, name: string): uint64 =
|
||||
let res = ttype(getTracker(name))
|
||||
if res.isNil(): 0'u64 else: uint64(res.opened - res.closed)
|
||||
|
||||
let res = RestChronosMetricsInfo(
|
||||
tcp_transports: getCount(StreamTransportTracker, "stream.transport"),
|
||||
udp_transports: getCount(DgramTransportTracker, "datagram.transport"),
|
||||
tcp_servers: getCount(StreamServerTracker, "stream.server"),
|
||||
stream_readers: getCount(AsyncStreamTracker,
|
||||
AsyncStreamReaderTrackerName),
|
||||
stream_writers: getCount(AsyncStreamTracker,
|
||||
AsyncStreamWriterTrackerName),
|
||||
http_client_connections: getCount(HttpClientTracker,
|
||||
HttpClientConnectionTrackerName),
|
||||
http_client_requests: getCount(HttpClientTracker,
|
||||
HttpClientRequestTrackerName),
|
||||
http_client_responses: getCount(HttpClientTracker,
|
||||
HttpClientResponseTrackerName),
|
||||
http_server_connections: lenu64(node.restServer.server.connections),
|
||||
http_body_readers: getCount(HttpBodyTracker, HttpBodyReaderTrackerName),
|
||||
http_body_writers: getCount(HttpBodyTracker, HttpBodyWriterTrackerName)
|
||||
)
|
||||
return RestApiResponse.jsonResponse(res)
|
||||
|
||||
router.api(MethodPost, "/nimbus/v1/validator/activity/{epoch}") do (
|
||||
epoch: Epoch, contentBody: Option[ContentBody]) -> RestApiResponse:
|
||||
let indexList =
|
||||
|
@ -275,7 +315,10 @@ proc installNimbusApiHandlers*(router: var RestRouter, node: BeaconNode) =
|
|||
return RestApiResponse.jsonError(Http400,
|
||||
InvalidValidatorIndexValueError,
|
||||
$dres.error())
|
||||
var res: HashSet[ValidatorIndex]
|
||||
var
|
||||
res: seq[ValidatorIndex]
|
||||
dupset: HashSet[ValidatorIndex]
|
||||
|
||||
let items = dres.get()
|
||||
for item in items:
|
||||
let vres = item.toValidatorIndex()
|
||||
|
@ -287,7 +330,12 @@ proc installNimbusApiHandlers*(router: var RestRouter, node: BeaconNode) =
|
|||
of ValidatorIndexError.UnsupportedValue:
|
||||
return RestApiResponse.jsonError(Http500,
|
||||
UnsupportedValidatorIndexValueError)
|
||||
res.incl(vres.get())
|
||||
let index = vres.get()
|
||||
if index in dupset:
|
||||
return RestApiResponse.jsonError(Http400,
|
||||
DuplicateValidatorIndexArrayError)
|
||||
dupset.incl(index)
|
||||
res.add(index)
|
||||
if len(res) == 0:
|
||||
return RestApiResponse.jsonError(Http400,
|
||||
EmptyValidatorIndexArrayError)
|
||||
|
|
|
@ -25,23 +25,12 @@ type
|
|||
proc serveAttestation(service: AttestationServiceRef, adata: AttestationData,
|
||||
duty: DutyAndProof): Future[bool] {.async.} =
|
||||
let vc = service.client
|
||||
let validator =
|
||||
block:
|
||||
let res = vc.getValidator(duty.data.pubkey)
|
||||
if res.isNone():
|
||||
return false
|
||||
res.get()
|
||||
let validator = vc.getValidator(duty.data.pubkey).valueOr: return false
|
||||
let fork = vc.forkAtEpoch(adata.slot.epoch)
|
||||
|
||||
doAssert(validator.index.isSome())
|
||||
let vindex = validator.index.get()
|
||||
|
||||
if not(vc.doppelgangerCheck(validator)):
|
||||
info "Attestation has not been served (doppelganger check still active)",
|
||||
slot = duty.data.slot, validator = shortLog(validator),
|
||||
validator_index = vindex
|
||||
return false
|
||||
|
||||
# TODO: signing_root is recomputed in getAttestationSignature just after,
|
||||
# but not for locally attached validators.
|
||||
let signingRoot =
|
||||
|
@ -133,13 +122,6 @@ proc serveAggregateAndProof*(service: AttestationServiceRef,
|
|||
vindex = validator.index.get()
|
||||
fork = vc.forkAtEpoch(slot.epoch)
|
||||
|
||||
if not(vc.doppelgangerCheck(validator)):
|
||||
info "Aggregate attestation has not been served " &
|
||||
"(doppelganger check still active)",
|
||||
slot = slot, validator = shortLog(validator),
|
||||
validator_index = vindex
|
||||
return false
|
||||
|
||||
debug "Signing aggregate", validator = shortLog(validator),
|
||||
attestation = shortLog(proof.aggregate), fork = fork
|
||||
|
||||
|
@ -421,9 +403,19 @@ proc spawnAttestationTasks(service: AttestationServiceRef,
|
|||
for item in attesters:
|
||||
res.mgetOrPut(item.data.committee_index, default).add(item)
|
||||
res
|
||||
|
||||
var dutiesSkipped: seq[string]
|
||||
for index, duties in dutiesByCommittee:
|
||||
if len(duties) > 0:
|
||||
asyncSpawn service.publishAttestationsAndAggregates(slot, index, duties)
|
||||
let (protectedDuties, skipped) = vc.doppelgangerFilter(duties)
|
||||
if len(skipped) > 0: dutiesSkipped.add(skipped)
|
||||
if len(protectedDuties) > 0:
|
||||
asyncSpawn service.publishAttestationsAndAggregates(slot, index,
|
||||
protectedDuties)
|
||||
if len(dutiesSkipped) > 0:
|
||||
info "Doppelganger protection disabled validator duties",
|
||||
validators = len(dutiesSkipped)
|
||||
trace "Doppelganger protection disabled validator duties dump",
|
||||
validators = dutiesSkipped
|
||||
|
||||
proc mainLoop(service: AttestationServiceRef) {.async.} =
|
||||
let vc = service.client
|
||||
|
|
|
@ -157,13 +157,9 @@ proc proposeBlock(vc: ValidatorClientRef, slot: Slot,
|
|||
|
||||
let sres = vc.getCurrentSlot()
|
||||
if sres.isSome():
|
||||
let currentSlot = sres.get()
|
||||
let validator =
|
||||
block:
|
||||
let res = vc.getValidator(proposerKey)
|
||||
if res.isNone():
|
||||
return
|
||||
res.get()
|
||||
let
|
||||
currentSlot = sres.get()
|
||||
validator = vc.getValidator(proposerKey).valueOr: return
|
||||
await vc.publishBlock(currentSlot, slot, validator)
|
||||
except CancelledError as exc:
|
||||
debug "Block proposing was interrupted", slot = slot,
|
||||
|
|
|
@ -254,6 +254,13 @@ proc `$`*(bn: BeaconNodeServerRef): string =
|
|||
else:
|
||||
bn.logIdent
|
||||
|
||||
proc validatorLog*(key: ValidatorPubKey,
|
||||
index: ValidatorIndex): string =
|
||||
var res = shortLog(key)
|
||||
res.add('@')
|
||||
res.add(Base10.toString(uint64(index)))
|
||||
res
|
||||
|
||||
chronicles.expandIt(BeaconNodeServerRef):
|
||||
node = $it
|
||||
node_index = it.index
|
||||
|
@ -493,17 +500,17 @@ proc getDelay*(vc: ValidatorClientRef, deadline: BeaconTime): TimeDiff =
|
|||
vc.beaconClock.now() - deadline
|
||||
|
||||
proc getValidator*(vc: ValidatorClientRef,
|
||||
key: ValidatorPubKey): Option[AttachedValidator] =
|
||||
key: ValidatorPubKey): Opt[AttachedValidator] =
|
||||
let validator = vc.attachedValidators[].getValidator(key)
|
||||
if isNil(validator):
|
||||
warn "Validator not in pool anymore", validator = shortLog(validator)
|
||||
none[AttachedValidator]()
|
||||
info "Validator not in pool anymore", validator = shortLog(validator)
|
||||
Opt.none(AttachedValidator)
|
||||
else:
|
||||
if validator.index.isNone():
|
||||
warn "Validator index is missing", validator = shortLog(validator)
|
||||
none[AttachedValidator]()
|
||||
info "Validator index is missing", validator = shortLog(validator)
|
||||
Opt.none(AttachedValidator)
|
||||
else:
|
||||
some(validator)
|
||||
Opt.some(validator)
|
||||
|
||||
proc forkAtEpoch*(vc: ValidatorClientRef, epoch: Epoch): Fork =
|
||||
# If schedule is present, it MUST not be empty.
|
||||
|
@ -522,6 +529,47 @@ proc getSubcommitteeIndex*(index: IndexInSyncCommittee): SyncSubcommitteeIndex =
|
|||
proc currentSlot*(vc: ValidatorClientRef): Slot =
|
||||
vc.beaconClock.now().slotOrZero()
|
||||
|
||||
proc addDoppelganger*(vc: ValidatorClientRef,
|
||||
validators: openArray[AttachedValidator]) =
|
||||
if vc.config.doppelgangerDetection:
|
||||
let startEpoch = vc.currentSlot().epoch()
|
||||
var
|
||||
check: seq[string]
|
||||
skip: seq[string]
|
||||
exist: seq[string]
|
||||
|
||||
for validator in validators:
|
||||
let
|
||||
vindex = validator.index.get()
|
||||
state =
|
||||
if (startEpoch == GENESIS_EPOCH) and
|
||||
(validator.startSlot == GENESIS_SLOT):
|
||||
DoppelgangerState(startEpoch: startEpoch, epochsCount: 0'u64,
|
||||
lastAttempt: DoppelgangerAttempt.None,
|
||||
status: DoppelgangerStatus.Passed)
|
||||
else:
|
||||
DoppelgangerState(startEpoch: startEpoch, epochsCount: 0'u64,
|
||||
lastAttempt: DoppelgangerAttempt.None,
|
||||
status: DoppelgangerStatus.Checking)
|
||||
res = vc.doppelgangerDetection.validators.hasKeyOrPut(vindex, state)
|
||||
|
||||
if res:
|
||||
exist.add(validatorLog(validator.pubkey, vindex))
|
||||
else:
|
||||
if state.status == DoppelgangerStatus.Checking:
|
||||
check.add(validatorLog(validator.pubkey, vindex))
|
||||
else:
|
||||
skip.add(validatorLog(validator.pubkey, vindex))
|
||||
info "Validator's doppelganger protection activated",
|
||||
validators_count = len(validators),
|
||||
pending_check_count = len(check),
|
||||
skipped_count = len(skip),
|
||||
exist_count = len(exist)
|
||||
debug "Validator's doppelganger protection dump",
|
||||
checking_validators = check,
|
||||
skip_validators = skip,
|
||||
existing_validators = exist
|
||||
|
||||
proc addDoppelganger*(vc: ValidatorClientRef, validator: AttachedValidator) =
|
||||
logScope:
|
||||
validator = shortLog(validator)
|
||||
|
@ -627,15 +675,39 @@ proc doppelgangerCheck*(vc: ValidatorClientRef,
|
|||
validator: AttachedValidator): bool =
|
||||
if vc.config.doppelgangerDetection:
|
||||
if validator.index.isNone():
|
||||
return false
|
||||
if validator.startSlot > GENESIS_SLOT:
|
||||
let
|
||||
vindex = validator.index.get()
|
||||
default = DoppelgangerState(status: DoppelgangerStatus.None)
|
||||
state = vc.doppelgangerDetection.validators.getOrDefault(vindex,
|
||||
default)
|
||||
state.status == DoppelgangerStatus.Passed
|
||||
false
|
||||
else:
|
||||
true
|
||||
if validator.startSlot > GENESIS_SLOT:
|
||||
let
|
||||
vindex = validator.index.get()
|
||||
default = DoppelgangerState(status: DoppelgangerStatus.None)
|
||||
state = vc.doppelgangerDetection.validators.getOrDefault(vindex,
|
||||
default)
|
||||
state.status == DoppelgangerStatus.Passed
|
||||
else:
|
||||
true
|
||||
else:
|
||||
true
|
||||
|
||||
proc doppelgangerCheck*(vc: ValidatorClientRef,
|
||||
key: ValidatorPubKey): bool =
|
||||
let validator = vc.getValidator(key).valueOr: return false
|
||||
vc.doppelgangerCheck(validator)
|
||||
|
||||
proc doppelgangerFilter*(
|
||||
vc: ValidatorClientRef,
|
||||
duties: openArray[DutyAndProof]
|
||||
): tuple[filtered: seq[DutyAndProof], skipped: seq[string]] =
|
||||
let defstate = DoppelgangerState(status: DoppelgangerStatus.None)
|
||||
var
|
||||
pending: seq[string]
|
||||
ready: seq[DutyAndProof]
|
||||
for duty in duties:
|
||||
let
|
||||
vindex = duty.data.validator_index
|
||||
vkey = duty.data.pubkey
|
||||
if vc.doppelgangerCheck(vkey):
|
||||
ready.add(duty)
|
||||
else:
|
||||
pending.add(validatorLog(vkey, vindex))
|
||||
(ready, pending)
|
||||
|
|
|
@ -77,18 +77,26 @@ proc pollForValidatorIndices*(vc: ValidatorClientRef) {.async.} =
|
|||
|
||||
offset += arraySize
|
||||
|
||||
var
|
||||
missing: seq[string]
|
||||
updated: seq[string]
|
||||
list: seq[AttachedValidator]
|
||||
|
||||
for item in validators:
|
||||
if item.validator.pubkey notin vc.attachedValidators[]:
|
||||
warn "Beacon node returned missing validator",
|
||||
pubkey = item.validator.pubkey, index = item.index
|
||||
var validator = vc.attachedValidators[].getValidator(item.validator.pubkey)
|
||||
if isNil(validator):
|
||||
missing.add(validatorLog(item.validator.pubkey, item.index))
|
||||
else:
|
||||
debug "Local validator updated with index",
|
||||
pubkey = item.validator.pubkey, index = item.index
|
||||
vc.attachedValidators[].updateValidator(item.validator.pubkey,
|
||||
item.index)
|
||||
# Adding validator for doppelganger detection.
|
||||
vc.addDoppelganger(
|
||||
vc.attachedValidators[].getValidator(item.validator.pubkey))
|
||||
validator.index = Opt.some(item.index)
|
||||
updated.add(validatorLog(item.validator.pubkey, item.index))
|
||||
list.add(validator)
|
||||
|
||||
if len(updated) > 0:
|
||||
info "Validator indices updated", missing_validators = len(missing),
|
||||
updated_validators = len(updated)
|
||||
trace "Validator indices update dump", missing_validators = missing,
|
||||
updated_validators = updated
|
||||
vc.addDoppelganger(list)
|
||||
|
||||
proc pollForAttesterDuties*(vc: ValidatorClientRef,
|
||||
epoch: Epoch): Future[int] {.async.} =
|
||||
|
|
|
@ -33,18 +33,10 @@ proc serveSyncCommitteeMessage*(service: SyncCommitteeServiceRef,
|
|||
vc = service.client
|
||||
fork = vc.forkAtEpoch(slot.epoch)
|
||||
genesisValidatorsRoot = vc.beaconGenesis.genesis_validators_root
|
||||
|
||||
vindex = duty.data.validator_index
|
||||
subcommitteeIdx = getSubcommitteeIndex(
|
||||
duty.data.validator_sync_committee_index)
|
||||
|
||||
validator =
|
||||
block:
|
||||
let res = vc.getValidator(duty.data.pubkey)
|
||||
if res.isNone():
|
||||
return false
|
||||
res.get()
|
||||
|
||||
validator = vc.getValidator(duty.data.pubkey).valueOr: return false
|
||||
message =
|
||||
block:
|
||||
let res = await getSyncCommitteeMessage(validator, fork,
|
||||
|
|
Loading…
Reference in New Issue