Implementation for REST /eth/v1/validator/liveness/{epoch} endpoint. (#4381)

Switch VC to use /eth/v1/validator/liveness/{epoch} endpoint instead of nimbus one.
This commit is contained in:
Eugene Kabanov 2022-12-06 13:29:00 +02:00 committed by GitHub
parent 415b11aa67
commit a311f04a19
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 221 additions and 6 deletions

View File

@ -4,7 +4,7 @@
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.
import std/[typetraits, sets]
import std/[typetraits, sets, sequtils]
import stew/[results, base10], chronicles
import ".."/[beacon_chain_db, beacon_node],
".."/networking/eth2_network,
@ -857,3 +857,66 @@ proc installValidatorApiHandlers*(router: var RestRouter, node: BeaconNode) =
signedValidatorRegistration
return RestApiResponse.response("", Http200, "text/plain")
router.api(MethodPost, "/eth/v1/validator/liveness/{epoch}") do (
epoch: Epoch, contentBody: Option[ContentBody]) -> RestApiResponse:
let
qepoch =
block:
if epoch.isErr():
return RestApiResponse.jsonError(Http400, InvalidEpochValueError,
$epoch.error())
let
res = epoch.get()
wallEpoch = node.currentSlot().epoch()
nextEpoch =
if wallEpoch == FAR_FUTURE_EPOCH:
wallEpoch
else:
wallEpoch + 1
prevEpoch = get_previous_epoch(wallEpoch)
if (res < prevEpoch) or (res > nextEpoch):
return RestApiResponse.jsonError(Http400, InvalidEpochValueError,
"Requested epoch is more than one epoch from current epoch")
res
indexList =
block:
if contentBody.isNone():
return RestApiResponse.jsonError(Http400, EmptyRequestBodyError)
let dres = decodeBody(seq[RestValidatorIndex], contentBody.get())
if dres.isErr():
return RestApiResponse.jsonError(Http400,
InvalidValidatorIndexValueError,
$dres.error())
var
res: seq[ValidatorIndex]
dupset: HashSet[ValidatorIndex]
let items = dres.get()
for item in items:
let vres = item.toValidatorIndex()
if vres.isErr():
case vres.error()
of ValidatorIndexError.TooHighValue:
return RestApiResponse.jsonError(Http400,
TooHighValidatorIndexValueError)
of ValidatorIndexError.UnsupportedValue:
return RestApiResponse.jsonError(Http500,
UnsupportedValidatorIndexValueError)
let index = vres.get()
if index in dupset:
return RestApiResponse.jsonError(Http400,
DuplicateValidatorIndexArrayError)
dupset.incl(index)
res.add(index)
if len(res) == 0:
return RestApiResponse.jsonError(Http400,
EmptyValidatorIndexArrayError)
res
response = indexList.mapIt(
RestLivenessItem(
index: it,
is_live: node.attestationPool[].validatorSeenAtEpoch(qepoch, it)
)
)
return RestApiResponse.jsonResponse(response)

View File

@ -2384,6 +2384,43 @@ proc readValue*(reader: var JsonReader[RestJson],
value = RestActivityItem(index: index.get(), epoch: epoch.get(),
active: active.get())
## RestLivenessItem
proc writeValue*(writer: var JsonWriter[RestJson],
value: RestLivenessItem) {.
raises: [IOError, Defect].} =
writer.beginRecord()
writer.writeField("index", value.index)
writer.writeField("is_live", value.is_live)
writer.endRecord()
proc readValue*(reader: var JsonReader[RestJson],
value: var RestLivenessItem) {.
raises: [SerializationError, IOError, Defect].} =
var index: Option[ValidatorIndex]
var isLive: Option[bool]
for fieldName in readObjectFields(reader):
case fieldName
of "index":
if index.isSome():
reader.raiseUnexpectedField(
"Multiple `index` fields found", "RestLivenessItem")
index = some(reader.readValue(ValidatorIndex))
of "is_live":
if isLive.isSome():
reader.raiseUnexpectedField(
"Multiple `is_live` fields found", "RestLivenessItem")
isLive = some(reader.readValue(bool))
else:
discard
if index.isNone():
reader.raiseUnexpectedValue("Missing or empty `index` value")
if isLive.isNone():
reader.raiseUnexpectedValue("Missing or empty `is_live` value")
value = RestLivenessItem(index: index.get(), is_live: isLive.get())
## HeadChangeInfoObject
proc writeValue*(writer: var JsonWriter[RestJson],
value: HeadChangeInfoObject) {.

View File

@ -252,6 +252,10 @@ type
epoch*: Epoch
active*: bool
RestLivenessItem* = object
index*: ValidatorIndex
is_live*: bool
PrepareBeaconProposer* = object
validator_index*: ValidatorIndex
fee_recipient*: Eth1Address
@ -609,6 +613,7 @@ type
ProduceSyncCommitteeContributionResponse* = DataEnclosedObject[SyncCommitteeContribution]
SubmitBlindedBlockResponse* = DataEnclosedObject[bellatrix.ExecutionPayload]
GetValidatorsActivityResponse* = DataEnclosedObject[seq[RestActivityItem]]
GetValidatorsLivenessResponse* = DataEnclosedObject[seq[RestLivenessItem]]
func `==`*(a, b: RestValidatorIndex): bool =
uint64(a) == uint64(b)

View File

@ -99,3 +99,9 @@ proc registerValidator*(body: seq[SignedValidatorRegistrationV1]): RestPlainResp
rest, endpoint: "/eth/v1/validator/register_validator",
meth: MethodPost.}
## https://ethereum.github.io/beacon-APIs/#/Validator/registerValidator
proc getValidatorsLiveness*(epoch: Epoch,
body: seq[ValidatorIndex]
): RestPlainResponse {.
rest, endpoint: "/eth/v1/validator/liveness/{epoch}",
meth: MethodPost.}

View File

@ -2241,3 +2241,107 @@ proc registerValidator*(
status = response.status, endpoint = apiResponse.node,
message = response.getErrorMessage()
return count
proc getValidatorsLiveness*(
vc: ValidatorClientRef, epoch: Epoch,
validators: seq[ValidatorIndex]
): Future[GetValidatorsLivenessResponse] {.async.} =
logScope: request = "getValidatorsActivity"
let resp = vc.onceToAll(RestPlainResponse, SlotDuration,
{BeaconNodeRole.Duties},
getValidatorsLiveness(it, epoch, validators))
case resp.status
of ApiOperation.Timeout:
debug "Unable to perform validator's liveness request in time",
timeout = SlotDuration
return GetValidatorsLivenessResponse()
of ApiOperation.Interrupt:
debug "Validator's liveness request was interrupted"
return GetValidatorsLivenessResponse()
of ApiOperation.Failure:
debug "Unexpected error happened while receiving validator's liveness"
return GetValidatorsLivenessResponse()
of ApiOperation.Success:
let defaultLiveness = RestLivenessItem(index: ValidatorIndex(high(uint32)))
var activities: Table[ValidatorIndex, RestLivenessItem]
for apiResponse in resp.data:
if apiResponse.data.isErr():
debug "Unable to retrieve validators liveness data",
endpoint = apiResponse.node, error = apiResponse.data.error()
else:
let response = apiResponse.data.get()
case response.status
of 200:
let res = decodeBytes(GetValidatorsLivenessResponse,
response.data, response.contentType)
if res.isOk():
let list = res.get().data
if len(list) != len(validators):
debug "Received incomplete validators liveness response",
endpoint = apiResponse.node,
validators_count = len(validators),
activities_count = len(list)
continue
else:
var updated = 0
for item in list:
activities.withValue(item.index, stored):
if item.is_live:
stored[].is_live = true
inc(updated)
do:
activities[item.index] = item
inc(updated)
debug "Received validators liveness response",
endpoint = apiResponse.node,
validators_count = len(validators),
activities_count = len(list),
updated_count = updated
else:
debug "Received invalid/incomplete response",
endpoint = apiResponse.node, error_message = res.error()
apiResponse.node.status = RestBeaconNodeStatus.Incompatible
continue
of 400:
debug "Server reports invalid request",
response_code = response.status,
endpoint = apiResponse.node,
response_error = response.getErrorMessage()
apiResponse.node.status = RestBeaconNodeStatus.Incompatible
continue
of 500:
debug "Server reports internal error",
response_code = response.status,
endpoint = apiResponse.node,
response_error = response.getErrorMessage()
apiResponse.node.status = RestBeaconNodeStatus.Offline
continue
of 503:
debug "Server reports that it not in sync",
response_code = response.status,
endpoint = apiResponse.node,
response_error = response.getErrorMessage()
apiResponse.node.status = RestBeaconNodeStatus.NotSynced
continue
else:
debug "Server reports unexpected error code",
response_code = response.status,
endpoint = apiResponse.node,
response_error = response.getErrorMessage()
apiResponse.node.status = RestBeaconNodeStatus.Offline
continue
var response =
block:
var res: seq[RestLivenessItem]
for vindex in validators:
let item = activities.getOrDefault(vindex, defaultLiveness)
if item == defaultLiveness:
debug "Validator is missing in response",
validator_index = vindex
return GetValidatorsLivenessResponse()
else:
res.add(item)
res
return GetValidatorsLivenessResponse(data: response)

View File

@ -30,7 +30,7 @@ proc waitForNextEpoch(service: DoppelgangerServiceRef) {.async.} =
await sleepAsync(sleepTime)
proc processActivities(service: DoppelgangerServiceRef, epoch: Epoch,
activities: GetValidatorsActivityResponse) =
activities: GetValidatorsLivenessResponse) =
let vc = service.client
if len(activities.data) == 0:
debug "Unable to monitor validator's activity for epoch", epoch = epoch
@ -39,10 +39,10 @@ proc processActivities(service: DoppelgangerServiceRef, epoch: Epoch,
value.epochsCount = 0'u64
value.lastAttempt = DoppelgangerAttempt.Failure
else:
for activity in activities.data:
let vindex = activity.index
for item in activities.data:
let vindex = item.index
vc.doppelgangerDetection.validators.withValue(vindex, value):
if activity.active:
if item.is_live:
if value.status == DoppelgangerStatus.Checking:
value.epochsCount = 0'u64
value.lastAttempt = DoppelgangerAttempt.SuccessTrue
@ -86,7 +86,7 @@ proc mainLoop(service: DoppelgangerServiceRef) {.async.} =
currentEpoch - 1'u64
validators = vc.getCheckingList()
if len(validators) > 0:
let activities = await vc.getValidatorsActivity(previousEpoch,
let activities = await vc.getValidatorsLiveness(previousEpoch,
validators)
service.processActivities(previousEpoch, activities)
else: