VC: doppelganger protection (#3877)

* Improve fallback_service.

* Improve logging in fallback_service.

* Apply signal handling for all stages.

* Fix some logging statements.

* Add doppelganger REST api endpoint.
Add some structures to VC.

* Add client API call implementation.

* Initial fix & refactor onceToAll()
Add doppelganger service.
Add doppelganger helpers.

* Add doppelganger checks.

* Move doppelganger log messages to higher levels.

* Fix firstSuccess().

* Bump chronos.

* Post rebase fixes.

* Proper chronos bump.

* Address review comments.

* Attempt to fix finalization test issue.

* Fix nimbus_signing_node.

* Mark validators which are added at GENESIS_SLOT in GENESIS_EPOCH as passed doppelganger validation.

* Do not send empty requests to server.

* Fix log statement.

* Address review comments and re-raise cancellations.

Co-authored-by: zah <zahary@gmail.com>
This commit is contained in:
Eugene Kabanov 2022-07-21 19:54:07 +03:00 committed by GitHub
parent 751ddd62f8
commit c3d3397843
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 700 additions and 27 deletions

View File

@ -767,6 +767,17 @@ type
abbr: "d"
name: "data-dir" .}: OutDir
doppelgangerDetection* {.
# TODO This description is shared between the BN and the VC.
# Extract it in a constant (confutils fix may be needed).
desc: "If enabled, the validator client prudently listens for 2 epochs " &
"for attestations from a validator with the same index " &
"(a doppelganger), before sending an attestation itself. This " &
"protects against slashing (due to double-voting) but means you " &
"will miss two attestations when restarting."
defaultValue: true
name: "doppelganger-detection" .}: bool
nonInteractive* {.
desc: "Do not display interative prompts. Quit on missing configuration"
name: "non-interactive" .}: bool

View File

@ -755,3 +755,11 @@ proc prune*(pool: var AttestationPool) =
# If pruning fails, it's likely the result of a bug - this shouldn't happen
# but we'll keep running hoping that the fork chocie will recover eventually
error "Couldn't prune fork choice, bug?", err = v.error()
proc validatorSeenAtEpoch*(pool: var AttestationPool, epoch: Epoch,
vindex: ValidatorIndex): bool =
if uint64(vindex) < lenu64(pool.nextAttestationEpoch):
let mark = pool.nextAttestationEpoch[vindex]
(mark.subnet > epoch) or (mark.aggregate > epoch)
else:
false

View File

@ -100,7 +100,9 @@ proc initValidators(sn: var SigningNode): bool =
for keystore in listLoadableKeystores(sn.config):
case keystore.kind
of KeystoreKind.Local:
sn.attachedValidators.addLocalValidator(keystore)
# Signing node is not supposed to know genesis time, so we just set
# `start_slot` to GENESIS_SLOT.
sn.attachedValidators.addLocalValidator(keystore, GENESIS_SLOT)
publicKeyIdents.add("\"0x" & keystore.pubkey.toHex() & "\"")
of KeystoreKind.Remote:
error "Signing node do not support remote validators",

View File

@ -6,29 +6,34 @@
# at your option. This file may not be copied, modified, or distributed except according to those terms.
import validator_client/[common, fallback_service, duties_service,
attestation_service, fork_service,
sync_committee_service]
sync_committee_service, doppelganger_service]
proc initGenesis(vc: ValidatorClientRef): Future[RestGenesis] {.async.} =
info "Initializing genesis", nodes_count = len(vc.beaconNodes)
var nodes = vc.beaconNodes
while true:
var pending: seq[Future[RestResponse[GetGenesisResponse]]]
var pendingRequests: seq[Future[RestResponse[GetGenesisResponse]]]
for node in nodes:
debug "Requesting genesis information", endpoint = node
pending.add(node.client.getGenesis())
pendingRequests.add(node.client.getGenesis())
try:
await allFutures(pending)
await allFutures(pendingRequests)
except CancelledError as exc:
var pending: seq[Future[void]]
debug "Genesis information request was interrupted"
for future in pendingRequests:
if not(future.finished()):
pending.add(future.cancelAndWait())
await allFutures(pending)
raise exc
let (errorNodes, genesisList) =
block:
var gres: seq[RestGenesis]
var bres: seq[BeaconNodeServerRef]
for i in 0 ..< len(pending):
let fut = pending[i]
for i in 0 ..< len(pendingRequests):
let fut = pendingRequests[i]
if fut.done():
let resp = fut.read()
if resp.status == 200:
@ -87,7 +92,7 @@ proc initValidators(vc: ValidatorClientRef): Future[bool] {.async.} =
continue
else:
duplicates.add(pubkey)
vc.attachedValidators.addLocalValidator(keystore)
vc.addValidator(keystore)
return true
proc initClock(vc: ValidatorClientRef): Future[BeaconClock] {.async.} =
@ -157,6 +162,7 @@ proc asyncInit(vc: ValidatorClientRef) {.async.} =
vc.fallbackService = await FallbackServiceRef.init(vc)
vc.forkService = await ForkServiceRef.init(vc)
vc.dutiesService = await DutiesServiceRef.init(vc)
vc.doppelgangerService = await DoppelgangerServiceRef.init(vc)
vc.attestationService = await AttestationServiceRef.init(vc)
vc.syncCommitteeService = await SyncCommitteeServiceRef.init(vc)
except CancelledError:
@ -168,6 +174,7 @@ proc asyncRun(vc: ValidatorClientRef) {.async.} =
vc.fallbackService.start()
vc.forkService.start()
vc.dutiesService.start()
vc.doppelgangerService.start()
vc.attestationService.start()
vc.syncCommitteeService.start()
@ -195,6 +202,7 @@ proc asyncRun(vc: ValidatorClientRef) {.async.} =
pending.add(vc.fallbackService.stop())
pending.add(vc.forkService.stop())
pending.add(vc.dutiesService.stop())
pending.add(vc.doppelgangerService.stop())
pending.add(vc.attestationService.stop())
pending.add(vc.syncCommitteeService.stop())
await allFutures(pending)

View File

@ -13,7 +13,7 @@ import
./rest_utils,
../eth1/eth1_monitor,
../validators/validator_duties,
../spec/forks,
../spec/[forks, beacon_time],
../beacon_node, ../nimbus_binary_common
export rest_utils
@ -263,6 +263,61 @@ proc installNimbusApiHandlers*(router: var RestRouter, node: BeaconNode) =
return RestApiResponse.jsonError(Http503,
"Compile with '-d:chronosFutureTracking' to get this request working")
router.api(MethodPost, "/nimbus/v1/validator/activity/{epoch}") do (
epoch: Epoch, contentBody: Option[ContentBody]) -> RestApiResponse:
let indexList =
block:
if contentBody.isNone():
return RestApiResponse.jsonError(Http400, EmptyRequestBodyError)
let dres = decodeBody(seq[RestValidatorIndex], contentBody.get())
if dres.isErr():
return RestApiResponse.jsonError(Http400,
InvalidValidatorIndexValueError,
$dres.error())
var res: HashSet[ValidatorIndex]
let items = dres.get()
for item in items:
let vres = item.toValidatorIndex()
if vres.isErr():
case vres.error()
of ValidatorIndexError.TooHighValue:
return RestApiResponse.jsonError(Http400,
TooHighValidatorIndexValueError)
of ValidatorIndexError.UnsupportedValue:
return RestApiResponse.jsonError(Http500,
UnsupportedValidatorIndexValueError)
res.incl(vres.get())
if len(res) == 0:
return RestApiResponse.jsonError(Http400,
EmptyValidatorIndexArrayError)
res
let qepoch =
block:
if epoch.isErr():
return RestApiResponse.jsonError(Http400, InvalidEpochValueError,
$epoch.error())
let
res = epoch.get()
wallEpoch = node.currentSlot().epoch()
nextEpoch =
if wallEpoch == FAR_FUTURE_EPOCH:
wallEpoch
else:
wallEpoch + 1
prevEpoch = get_previous_epoch(wallEpoch)
if (res < prevEpoch) or (res > nextEpoch):
return RestApiResponse.jsonError(Http400, InvalidEpochValueError,
"Requested epoch is more than one epoch from current epoch")
res
let response = indexList.mapIt(
RestActivityItem(
index: it,
epoch: qepoch,
active: node.attestationPool[].validatorSeenAtEpoch(qepoch, it)
)
)
return RestApiResponse.jsonResponse(response)
router.api(MethodGet, "/nimbus/v1/debug/gossip/peers") do (
) -> RestApiResponse:

View File

@ -2097,6 +2097,52 @@ proc readValue*(reader: var JsonReader[RestJson],
keystores: keystores, passwords: passwords, slashing_protection: slashing
)
proc writeValue*(writer: var JsonWriter[RestJson],
value: RestActivityItem) {.
raises: [IOError, Defect].} =
writer.beginRecord()
writer.writeField("index", value.index)
writer.writeField("epoch", value.epoch)
writer.writeField("active", value.active)
writer.endRecord()
proc readValue*(reader: var JsonReader[RestJson],
value: var RestActivityItem) {.
raises: [SerializationError, IOError, Defect].} =
var index: Option[ValidatorIndex]
var epoch: Option[Epoch]
var active: Option[bool]
for fieldName in readObjectFields(reader):
case fieldName
of "index":
if index.isSome():
reader.raiseUnexpectedField(
"Multiple `index` fields found", "RestActivityItem")
index = some(reader.readValue(ValidatorIndex))
of "epoch":
if epoch.isSome():
reader.raiseUnexpectedField(
"Multiple `epoch` fields found", "RestActivityItem")
epoch = some(reader.readValue(Epoch))
of "active":
if active.isSome():
reader.raiseUnexpectedField(
"Multiple `active` fields found", "RestActivityItem")
active = some(reader.readValue(bool))
else:
discard
if index.isNone():
reader.raiseUnexpectedValue("Missing or empty `index` value")
if epoch.isNone():
reader.raiseUnexpectedValue("Missing or empty `epoch` value")
if active.isNone():
reader.raiseUnexpectedValue("Missing or empty `active` value")
value = RestActivityItem(index: index.get(), epoch: epoch.get(),
active: active.get())
proc dump*(value: KeystoresAndSlashingProtection): string {.
raises: [IOError, Defect].} =
var stream = memoryOutput()

View File

@ -11,11 +11,11 @@ import
"."/[
rest_beacon_calls, rest_config_calls, rest_debug_calls,
rest_node_calls, rest_validator_calls, rest_keymanager_calls,
rest_common
rest_nimbus_calls, rest_common
]
export
chronos, client,
rest_beacon_calls, rest_config_calls, rest_debug_calls,
rest_node_calls, rest_validator_calls, rest_keymanager_calls,
rest_common
rest_nimbus_calls, rest_common

View File

@ -0,0 +1,17 @@
# Copyright (c) 2018-2022 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.
{.push raises: [Defect].}
import
chronos, presto/client,
"."/[rest_types, eth2_rest_serialization]
proc getValidatorsActivity*(epoch: Epoch,
body: seq[ValidatorIndex]
): RestPlainResponse {.
rest, endpoint: "/nimbus/v1/validator/activity/{epoch}",
meth: MethodPost.}

View File

@ -234,6 +234,11 @@ type
discovery_addresses*: seq[string]
metadata*: RestMetadata
RestActivityItem* = object
index*: ValidatorIndex
epoch*: Epoch
active*: bool
RestPublishedSignedBeaconBlock* = distinct ForkedSignedBeaconBlock
RestPublishedBeaconBlock* = distinct ForkedBeaconBlock
@ -569,6 +574,7 @@ type
ProduceBlockResponseV2* = ForkedBeaconBlock
ProduceSyncCommitteeContributionResponse* = DataEnclosedObject[SyncCommitteeContribution]
SubmitBlindedBlockResponse* = DataEnclosedObject[bellatrix.ExecutionPayload]
GetValidatorsActivityResponse* = DataEnclosedObject[seq[RestActivityItem]]
func `==`*(a, b: RestValidatorIndex): bool =
uint64(a) == uint64(b)

View File

@ -10,6 +10,136 @@ type
ApiOperation = enum
Success, Timeout, Failure, Interrupt
ApiNodeResponse*[T] = object
node*: BeaconNodeServerRef
data*: ApiResponse[T]
ApiResponseSeq*[T] = object
status*: ApiOperation
data*: seq[ApiNodeResponse[T]]
template onceToAll*(vc: ValidatorClientRef, responseType: typedesc,
timeout: Duration,
body: untyped): ApiResponseSeq[responseType] =
var it {.inject.}: RestClientRef
type BodyType = typeof(body)
var timerFut =
if timeout != InfiniteDuration:
sleepAsync(timeout)
else:
nil
let onlineNodes =
try:
if not isNil(timerFut):
await vc.waitOnlineNodes(timerFut)
vc.onlineNodes()
except CancelledError as exc:
var default: seq[BeaconNodeServerRef]
if not(isNil(timerFut)) and not(timerFut.finished()):
await timerFut.cancelAndWait()
raise exc
except CatchableError as exc:
# This case could not be happened.
error "Unexpected exception while waiting for beacon nodes",
err_name = $exc.name, err_msg = $exc.msg
var default: seq[BeaconNodeServerRef]
default
if len(onlineNodes) == 0:
# Timeout exceeded or operation was cancelled
ApiResponseSeq[responseType](status: ApiOperation.Timeout)
else:
let (pendingRequests, pendingNodes) =
block:
var requests: seq[BodyType]
var nodes: seq[BeaconNodeServerRef]
for node {.inject.} in onlineNodes:
it = node.client
let fut = body
requests.add(fut)
nodes.add(node)
(requests, nodes)
let status =
try:
if isNil(timerFut):
await allFutures(pendingRequests)
ApiOperation.Success
else:
let waitFut = allFutures(pendingRequests)
discard await race(waitFut, timerFut)
if not(waitFut.finished()):
await waitFut.cancelAndWait()
ApiOperation.Timeout
else:
if not(timerFut.finished()):
await timerFut.cancelAndWait()
ApiOperation.Success
except CancelledError as exc:
# We should cancel all the pending requests and timer before we return
# result.
var pendingCancel: seq[Future[void]]
for fut in pendingRequests:
if not(fut.finished()):
pendingCancel.add(fut.cancelAndWait())
if not(isNil(timerFut)) and not(timerFut.finished()):
pendingCancel.add(timerFut.cancelAndWait())
await allFutures(pendingCancel)
raise exc
except CatchableError:
# This should not be happened, because allFutures() and race() did not
# raise any exceptions.
ApiOperation.Failure
let responses =
block:
var res: seq[ApiNodeResponse[responseType]]
for idx, pnode in pendingNodes.pairs():
let apiResponse =
block:
let fut = pendingRequests[idx]
if fut.finished():
if fut.failed() or fut.cancelled():
let exc = fut.readError()
ApiNodeResponse[responseType](
node: pnode,
data: ApiResponse[responseType].err("[" & $exc.name & "] " &
$exc.msg)
)
else:
ApiNodeResponse[responseType](
node: pnode,
data: ApiResponse[responseType].ok(fut.read())
)
else:
case status
of ApiOperation.Interrupt:
pendingNodes[idx].status = RestBeaconNodeStatus.Offline
ApiNodeResponse[responseType](
node: pnode,
data: ApiResponse[responseType].err("Operation interrupted")
)
of ApiOperation.Timeout:
pendingNodes[idx].status = RestBeaconNodeStatus.Offline
ApiNodeResponse[responseType](
node: pnode,
data: ApiResponse[responseType].err(
"Operation timeout exceeded")
)
of ApiOperation.Success, ApiOperation.Failure:
# This should not be happened, because all Futures should be
# finished, and `Failure` processed when Future is finished.
ApiNodeResponse[responseType](
node: pnode,
data: ApiResponse[responseType].err("Unexpected error")
)
res.add(apiResponse)
res
ApiResponseSeq[responseType](status: status, data: responses)
template firstSuccessTimeout*(vc: ValidatorClientRef, respType: typedesc,
timeout: Duration, body: untyped,
handlers: untyped): untyped =
@ -822,3 +952,104 @@ proc prepareSyncCommitteeSubnets*(
raise newException(ValidatorApiError,
"Unable to prepare sync committee subnet")
proc getValidatorsActivity*(
vc: ValidatorClientRef, epoch: Epoch,
validators: seq[ValidatorIndex]
): Future[GetValidatorsActivityResponse] {.async.} =
logScope: request = "getValidatorsActivity"
let resp = vc.onceToAll(RestPlainResponse, SlotDuration,
getValidatorsActivity(it, epoch, validators))
case resp.status
of ApiOperation.Timeout:
debug "Unable to perform validator's activity request in time",
timeout = SlotDuration
return GetValidatorsActivityResponse()
of ApiOperation.Interrupt:
debug "Validator's activity request was interrupted"
return GetValidatorsActivityResponse()
of ApiOperation.Failure:
debug "Unexpected error happened while receiving validator's activity"
return GetValidatorsActivityResponse()
of ApiOperation.Success:
var activities: seq[RestActivityItem]
for apiResponse in resp.data:
if apiResponse.data.isErr():
debug "Unable to retrieve validators activity data",
endpoint = apiResponse.node, error = apiResponse.data.error()
else:
let
response = apiResponse.data.get()
activity =
block:
var default: seq[RestActivityItem]
case response.status
of 200:
let res = decodeBytes(GetValidatorsActivityResponse,
response.data, response.contentType)
if res.isOk():
let list = res.get().data
if len(list) != len(validators):
debug "Received incomplete validators activity response",
endpoint = apiResponse.node,
validators_count = len(validators),
activities_count = len(list)
default
else:
let isOrdered =
block:
var res = true
for index in 0 ..< len(validators):
if list[index].index != validators[index]:
res = false
break
res
if not(isOrdered):
debug "Received unordered validators activity response",
endpoint = apiResponse.node,
validators_count = len(validators),
activities_count = len(list)
default
else:
debug "Received validators activity response",
endpoint = apiResponse.node,
validators_count = len(validators),
activities_count = len(list)
list
else:
debug "Received invalid/incomplete response",
endpoint = apiResponse.node, error_message = res.error()
apiResponse.node.status = RestBeaconNodeStatus.Incompatible
default
of 400:
debug "Server reports invalid request",
response_code = response.status,
endpoint = apiResponse.node,
response_error = response.getGenericErrorMessage()
apiResponse.node.status = RestBeaconNodeStatus.Incompatible
default
of 500:
debug "Server reports internal error",
response_code = response.status,
endpoint = apiResponse.node,
response_error = response.getGenericErrorMessage()
apiResponse.node.status = RestBeaconNodeStatus.Offline
default
else:
debug "Server reports unexpected error code",
response_code = response.status,
endpoint = apiResponse.node,
response_error = response.getGenericErrorMessage()
apiResponse.node.status = RestBeaconNodeStatus.Offline
default
if len(activity) > 0:
if len(activities) == 0:
activities = activity
else:
# If single node returns `active` it means that validator's
# activity was seen by this node, so result would be `active`.
for index in 0 ..< len(activities):
if activity[index].active:
activities[index].active = true
return GetValidatorsActivityResponse(data: activities)

View File

@ -31,6 +31,15 @@ proc serveAttestation(service: AttestationServiceRef, adata: AttestationData,
res.get()
let fork = vc.forkAtEpoch(adata.slot.epoch)
doAssert(validator.index.isSome())
let vindex = validator.index.get()
if not(vc.doppelgangerCheck(validator)):
info "Attestation has not been served (doppelganger check still active)",
slot = duty.data.slot, validator = shortLog(validator),
validator_index = vindex
return false
# TODO: signing_root is recomputed in getAttestationSignature just after,
# but not for locally attached validators.
let signingRoot =
@ -38,7 +47,6 @@ proc serveAttestation(service: AttestationServiceRef, adata: AttestationData,
fork, vc.beaconGenesis.genesis_validators_root, adata)
let attestationRoot = adata.hash_tree_root()
let vindex = validator.index.get()
let notSlashable = vc.attachedValidators.slashingProtection
.registerAttestation(vindex, validator.pubkey,
adata.source.epoch,
@ -120,8 +128,15 @@ proc serveAggregateAndProof*(service: AttestationServiceRef,
vc = service.client
genesisRoot = vc.beaconGenesis.genesis_validators_root
slot = proof.aggregate.data.slot
fork = vc.forkAtEpoch(slot.epoch)
vindex = validator.index.get()
fork = vc.forkAtEpoch(slot.epoch)
if not(vc.doppelgangerCheck(validator)):
info "Aggregate attestation has not been served " &
"(doppelganger check still active)",
slot = slot, validator = shortLog(validator),
validator_index = vindex
return false
debug "Signing aggregate", validator = shortLog(validator),
attestation = shortLog(proof.aggregate), fork = fork

View File

@ -14,6 +14,13 @@ proc publishBlock(vc: ValidatorClientRef, currentSlot, slot: Slot,
else:
defaultGraffitiBytes()
fork = vc.forkAtEpoch(slot.epoch)
vindex = validator.index.get()
if not(vc.doppelgangerCheck(validator)):
info "Block has not been produced (doppelganger check still active)",
slot = slot, validator = shortLog(validator),
validator_index = vindex
return
debug "Publishing block", validator = shortLog(validator),
delay = vc.getDelay(slot.block_deadline()),

View File

@ -63,6 +63,9 @@ type
SyncCommitteeServiceRef* = ref object of ClientServiceRef
DoppelgangerServiceRef* = ref object of ClientServiceRef
enabled*: bool
DutyAndProof* = object
epoch*: Epoch
dependentRoot*: Eth2Digest
@ -116,6 +119,22 @@ type
SyncCommitteeDutiesMap* = Table[ValidatorPubKey, EpochSyncDuties]
ProposerMap* = Table[Epoch, ProposedData]
DoppelgangerStatus* {.pure.} = enum
None, Checking, Passed
DoppelgangerAttempt* {.pure.} = enum
None, Failure, SuccessTrue, SuccessFalse
DoppelgangerState* = object
startEpoch*: Epoch
epochsCount*: uint64
lastAttempt*: DoppelgangerAttempt
status*: DoppelgangerStatus
DoppelgangerDetection* = object
startSlot*: Slot
validators*: Table[ValidatorIndex, DoppelgangerState]
ValidatorClient* = object
config*: ValidatorClientConf
graffitiBytes*: GraffitiBytes
@ -126,11 +145,13 @@ type
attestationService*: AttestationServiceRef
blockService*: BlockServiceRef
syncCommitteeService*: SyncCommitteeServiceRef
doppelgangerService*: DoppelgangerServiceRef
runSlotLoopFut*: Future[void]
sigintHandleFut*: Future[void]
sigtermHandleFut*: Future[void]
beaconClock*: BeaconClock
attachedValidators*: ValidatorPool
doppelgangerDetection*: DoppelgangerDetection
forks*: seq[Fork]
forksAvailable*: AsyncEvent
nodesAvailable*: AsyncEvent
@ -346,3 +367,120 @@ proc forkAtEpoch*(vc: ValidatorClientRef, epoch: Epoch): Fork =
proc getSubcommitteeIndex*(index: IndexInSyncCommittee): SyncSubcommitteeIndex =
SyncSubcommitteeIndex(uint16(index) div SYNC_SUBCOMMITTEE_SIZE)
proc currentSlot*(vc: ValidatorClientRef): Slot =
vc.beaconClock.now().slotOrZero()
proc addDoppelganger*(vc: ValidatorClientRef, validator: AttachedValidator) =
logScope:
validator = shortLog(validator)
if vc.config.doppelgangerDetection:
let
vindex = validator.index.get()
startEpoch = vc.currentSlot().epoch()
state =
if (startEpoch == GENESIS_EPOCH) and
(validator.startSlot == GENESIS_SLOT):
DoppelgangerState(startEpoch: startEpoch, epochsCount: 0'u64,
lastAttempt: DoppelgangerAttempt.None,
status: DoppelgangerStatus.Passed)
else:
DoppelgangerState(startEpoch: startEpoch, epochsCount: 0'u64,
lastAttempt: DoppelgangerAttempt.None,
status: DoppelgangerStatus.Checking)
res = vc.doppelgangerDetection.validators.hasKeyOrPut(vindex, state)
if res:
warn "Validator is already in doppelganger table",
validator_index = vindex, start_epoch = startEpoch,
start_slot = validator.startSlot
else:
if state.status == DoppelgangerStatus.Checking:
info "Doppelganger protection activated", validator_index = vindex,
start_epoch = startEpoch, start_slot = validator.startSlot
else:
info "Doppelganger protection skipped", validator_index = vindex,
start_epoch = startEpoch, start_slot = validator.startSlot
proc removeDoppelganger*(vc: ValidatorClientRef, index: ValidatorIndex) =
if vc.config.doppelgangerDetection:
var state: DoppelgangerState
# We do not care about race condition, when validator is not yet added to
# the doppelganger's table, but it should be removed.
discard vc.doppelgangerDetection.validators.pop(index, state)
proc addValidator*(vc: ValidatorClientRef, keystore: KeystoreData) =
let slot = vc.currentSlot()
case keystore.kind
of KeystoreKind.Local:
vc.attachedValidators.addLocalValidator(keystore, none[ValidatorIndex](),
slot)
of KeystoreKind.Remote:
let
httpFlags =
block:
var res: set[HttpClientFlag]
if RemoteKeystoreFlag.IgnoreSSLVerification in keystore.flags:
res.incl({HttpClientFlag.NoVerifyHost,
HttpClientFlag.NoVerifyServerName})
res
prestoFlags = {RestClientFlag.CommaSeparatedArray}
clients =
block:
var res: seq[(RestClientRef, RemoteSignerInfo)]
for remote in keystore.remotes:
let client = RestClientRef.new($remote.url, prestoFlags,
httpFlags)
if client.isErr():
warn "Unable to resolve distributed signer address",
remote_url = $remote.url, validator = $remote.pubkey
else:
res.add((client.get(), remote))
res
if len(clients) > 0:
vc.attachedValidators.addRemoteValidator(keystore, clients,
none[ValidatorIndex](), slot)
else:
warn "Unable to initialize remote validator",
validator = $keystore.pubkey
proc removeValidator*(vc: ValidatorClientRef,
pubkey: ValidatorPubKey) {.async.} =
let validator = vc.attachedValidators.getValidator(pubkey)
if not(isNil(validator)):
if vc.config.doppelgangerDetection:
if validator.index.isSome():
vc.removeDoppelganger(validator.index.get())
case validator.kind
of ValidatorKind.Local:
discard
of ValidatorKind.Remote:
# We must close all the REST clients running for the remote validator.
let pending =
block:
var res: seq[Future[void]]
for item in validator.clients:
res.add(item[0].closeWait())
res
await allFutures(pending)
# Remove validator from ValidatorPool.
vc.attachedValidators.removeValidator(pubkey)
proc doppelgangerCheck*(vc: ValidatorClientRef,
validator: AttachedValidator): bool =
if vc.config.doppelgangerDetection:
if validator.index.isNone():
return false
if validator.startSlot > GENESIS_SLOT:
let
vindex = validator.index.get()
default = DoppelgangerState(status: DoppelgangerStatus.None)
currentEpoch = vc.currentSlot().epoch()
state = vc.doppelgangerDetection.validators.getOrDefault(vindex,
default)
state.status == DoppelgangerStatus.Passed
else:
true
else:
true

View File

@ -0,0 +1,117 @@
# beacon_chain
# Copyright (c) 2022 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.
import chronicles
import common, api
const
ServiceName = "doppelganger_service"
logScope: service = ServiceName
const
DOPPELGANGER_EPOCHS_COUNT = 2
proc getCheckingList*(vc: ValidatorClientRef): seq[ValidatorIndex] =
var res: seq[ValidatorIndex]
for index, value in vc.doppelgangerDetection.validators.pairs():
if value.status == DoppelgangerStatus.Checking:
res.add(index)
res
proc waitForNextEpoch(service: DoppelgangerServiceRef) {.async.} =
let vc = service.client
let sleepTime = vc.beaconClock.durationToNextEpoch() + TIME_DELAY_FROM_SLOT
debug "Sleeping until next epoch", sleep_time = sleepTime
await sleepAsync(sleepTime)
proc processActivities(service: DoppelgangerServiceRef, epoch: Epoch,
activities: GetValidatorsActivityResponse) =
let vc = service.client
if len(activities.data) == 0:
debug "Unable to monitor validator's activity for epoch", epoch = epoch
for index, value in vc.doppelgangerDetection.validators.mpairs():
if value.status == DoppelgangerStatus.Checking:
value.epochsCount = 0'u64
value.lastAttempt = DoppelgangerAttempt.Failure
else:
for activity in activities.data:
let vindex = activity.index
vc.doppelgangerDetection.validators.withValue(vindex, value):
if activity.active:
if value.status == DoppelgangerStatus.Checking:
value.epochsCount = 0'u64
value.lastAttempt = DoppelgangerAttempt.SuccessTrue
warn "Validator's activity has been seen",
validator_index = vindex, epoch = epoch
vc.gracefulExit.fire()
return
else:
if value.status == DoppelgangerStatus.Checking:
value.lastAttempt = DoppelgangerAttempt.SuccessFalse
if value.epochsCount == DOPPELGANGER_EPOCHS_COUNT:
value.status = DoppelgangerStatus.Passed
info "Validator successfully passed doppelganger detection",
validator_index = vindex
else:
inc(value.epochsCount)
notice "Validator's activity was not seen",
validator_index = vindex, epoch = epoch,
epochs_count = value.epochsCount
proc mainLoop(service: DoppelgangerServiceRef) {.async.} =
let vc = service.client
service.state = ServiceState.Running
if service.enabled:
debug "Service started"
else:
debug "Service disabled because of configuration settings"
return
while true:
let breakLoop =
try:
await service.waitForNextEpoch()
let
currentEpoch = vc.currentSlot().epoch()
previousEpoch =
if currentEpoch == Epoch(0):
currentEpoch
else:
currentEpoch - 1'u64
validators = vc.getCheckingList()
if len(validators) > 0:
let activities = await vc.getValidatorsActivity(previousEpoch,
validators)
service.processActivities(previousEpoch, activities)
else:
debug "No validators found that require doppelganger protection"
discard
false
except CancelledError:
debug "Service interrupted"
true
except CatchableError as exc:
warn "Service crashed with unexpected error", err_name = exc.name,
err_msg = exc.msg
true
if breakLoop:
break
proc init*(t: typedesc[DoppelgangerServiceRef],
vc: ValidatorClientRef): Future[DoppelgangerServiceRef] {.async.} =
logScope: service = ServiceName
let res = DoppelgangerServiceRef(name: ServiceName,
client: vc, state: ServiceState.Initialized,
enabled: vc.config.doppelgangerDetection)
debug "Initializing service"
return res
proc start*(service: DoppelgangerServiceRef) =
service.lifeFut = mainLoop(service)

View File

@ -79,6 +79,9 @@ proc pollForValidatorIndices*(vc: ValidatorClientRef) {.async.} =
pubkey = item.validator.pubkey, index = item.index
vc.attachedValidators.updateValidator(item.validator.pubkey,
item.index)
# Adding validator for doppelganger detection.
vc.addDoppelganger(
vc.attachedValidators.getValidator(item.validator.pubkey))
proc pollForAttesterDuties*(vc: ValidatorClientRef,
epoch: Epoch): Future[int] {.async.} =

View File

@ -84,14 +84,14 @@ proc findValidator(validators: auto, pubkey: ValidatorPubKey):
some(idx.ValidatorIndex)
proc addLocalValidator(node: BeaconNode, validators: auto,
item: KeystoreData) =
item: KeystoreData, slot: Slot) =
let
pubkey = item.pubkey
index = findValidator(validators, pubkey)
node.attachedValidators[].addLocalValidator(item, index)
node.attachedValidators[].addLocalValidator(item, index, slot)
proc addRemoteValidator(pool: var ValidatorPool, validators: auto,
item: KeystoreData) =
item: KeystoreData, slot: Slot) =
var clients: seq[(RestClientRef, RemoteSignerInfo)]
let httpFlags =
block:
@ -108,20 +108,22 @@ proc addRemoteValidator(pool: var ValidatorPool, validators: auto,
remote_url = $remote.url, validator = $remote.pubkey
clients.add((client.get(), remote))
let index = findValidator(validators, item.pubkey)
pool.addRemoteValidator(item, clients, index)
pool.addRemoteValidator(item, clients, index, slot)
proc addLocalValidators*(node: BeaconNode,
validators: openArray[KeystoreData]) =
let slot = node.currentSlot()
withState(node.dag.headState):
for item in validators:
node.addLocalValidator(state.data.validators.asSeq(), item)
node.addLocalValidator(state.data.validators.asSeq(), item, slot)
proc addRemoteValidators*(node: BeaconNode,
validators: openArray[KeystoreData]) =
let slot = node.currentSlot()
withState(node.dag.headState):
for item in validators:
node.attachedValidators[].addRemoteValidator(
state.data.validators.asSeq(), item)
state.data.validators.asSeq(), item, slot)
proc addValidators*(node: BeaconNode) =
let (localValidators, remoteValidators) =

View File

@ -53,6 +53,8 @@ type
# if the validator will be aggregating (in the near future)
slotSignature*: Option[tuple[slot: Slot, signature: ValidatorSig]]
startSlot*: Slot
SignResponse* = Web3SignerDataResponse
SignatureResult* = Result[ValidatorSig, string]
@ -81,27 +83,32 @@ template count*(pool: ValidatorPool): int =
len(pool.validators)
proc addLocalValidator*(pool: var ValidatorPool, item: KeystoreData,
index: Option[ValidatorIndex]) =
index: Option[ValidatorIndex], slot: Slot) =
doAssert item.kind == KeystoreKind.Local
let pubkey = item.pubkey
let v = AttachedValidator(kind: ValidatorKind.Local, pubkey: pubkey,
index: index, data: item)
index: index, data: item, startSlot: slot)
pool.validators[pubkey] = v
notice "Local validator attached", pubkey, validator = shortLog(v)
notice "Local validator attached", pubkey, validator = shortLog(v),
start_slot = slot
validators.set(pool.count().int64)
proc addLocalValidator*(pool: var ValidatorPool, item: KeystoreData) =
addLocalValidator(pool, item, none[ValidatorIndex]())
proc addLocalValidator*(pool: var ValidatorPool, item: KeystoreData,
slot: Slot) =
addLocalValidator(pool, item, none[ValidatorIndex](), slot)
proc addRemoteValidator*(pool: var ValidatorPool, item: KeystoreData,
clients: seq[(RestClientRef, RemoteSignerInfo)], index: Option[ValidatorIndex]) =
clients: seq[(RestClientRef, RemoteSignerInfo)],
index: Option[ValidatorIndex], slot: Slot) =
doAssert item.kind == KeystoreKind.Remote
let pubkey = item.pubkey
let v = AttachedValidator(kind: ValidatorKind.Remote, pubkey: pubkey,
index: index, data: item, clients: clients)
index: index, data: item, clients: clients,
startSlot: slot)
pool.validators[pubkey] = v
notice "Remote validator attached", pubkey, validator = shortLog(v),
remote_signer = $item.remotes
remote_signer = $item.remotes,
start_slot = slot
validators.set(pool.count().int64)
proc getValidator*(pool: ValidatorPool,