VC: Obol middleware support (#5375)

This commit is contained in:
Eugene Kabanov 2023-11-08 14:03:51 +02:00 committed by GitHub
parent 1c03ea80f8
commit 6bc038e8d5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 1187 additions and 294 deletions

View File

@ -595,13 +595,15 @@ OK: 24/24 Fail: 0/24 Skip: 0/24
OK: 1/1 Fail: 0/1 Skip: 0/1
## Validator Client test suite
```diff
+ /eth/v1/validator/beacon_committee_selections serialization/deserialization test OK
+ /eth/v1/validator/sync_committee_selections serialization/deserialization test OK
+ bestSuccess() API timeout test OK
+ firstSuccessParallel() API timeout test OK
+ getAttestationDataScore() test vectors OK
+ getLiveness() response deserialization test OK
+ normalizeUri() test vectors OK
```
OK: 5/5 Fail: 0/5 Skip: 0/5
OK: 7/7 Fail: 0/7 Skip: 0/7
## Validator change pool testing suite
```diff
+ addValidatorChangeMessage/getAttesterSlashingMessage OK
@ -714,4 +716,4 @@ OK: 2/2 Fail: 0/2 Skip: 0/2
OK: 9/9 Fail: 0/9 Skip: 0/9
---TOTAL---
OK: 403/408 Fail: 0/408 Skip: 5/408
OK: 405/410 Fail: 0/410 Skip: 5/410

View File

@ -1027,6 +1027,11 @@ type
defaultValue: false
name: "payload-builder" .}: bool
distributedEnabled* {.
desc: "Enable usage of Obol middleware (BETA)"
defaultValue: false
name: "distributed".}: bool
beaconNodes* {.
desc: "URL addresses to one or more beacon node HTTP REST APIs",
defaultValue: @[defaultBeaconNodeUri]

View File

@ -20,10 +20,6 @@ export
results, eth2_rest_serialization, blockchain_dag, presto, rest_types,
rest_constants, rest_common
type
ValidatorIndexError* {.pure.} = enum
UnsupportedValue, TooHighValue
func match(data: openArray[char], charset: set[char]): int =
for ch in data:
if ch notin charset:
@ -216,26 +212,6 @@ template strData*(body: ContentBody): string =
bind fromBytes
string.fromBytes(body.data)
func toValidatorIndex*(value: RestValidatorIndex): Result[ValidatorIndex,
ValidatorIndexError] =
when sizeof(ValidatorIndex) == 4:
if uint64(value) < VALIDATOR_REGISTRY_LIMIT:
# On x86 platform Nim allows only `int32` indexes, so all the indexes in
# range `2^31 <= x < 2^32` are not supported.
if uint64(value) <= uint64(high(int32)):
ok(ValidatorIndex(value))
else:
err(ValidatorIndexError.UnsupportedValue)
else:
err(ValidatorIndexError.TooHighValue)
elif sizeof(ValidatorIndex) == 8:
if uint64(value) < VALIDATOR_REGISTRY_LIMIT:
ok(ValidatorIndex(value))
else:
err(ValidatorIndexError.TooHighValue)
else:
doAssert(false, "ValidatorIndex type size is incorrect")
func syncCommitteeParticipants*(forkedState: ForkedHashedBeaconState,
epoch: Epoch
): Result[seq[ValidatorPubKey], cstring] =

View File

@ -122,7 +122,9 @@ type
seq[RestSyncCommitteeSubscription] |
seq[SignedAggregateAndProof] |
seq[SignedValidatorRegistrationV1] |
seq[ValidatorIndex]
seq[ValidatorIndex] |
seq[RestBeaconCommitteeSelection] |
seq[RestSyncCommitteeSelection]
DecodeTypes* =
DataEnclosedObject |

View File

@ -63,6 +63,9 @@ type
ValidatorQueryKind* {.pure.} = enum
Index, Key
ValidatorIndexError* {.pure.} = enum
UnsupportedValue, TooHighValue
ValidatorIdent* = object
case kind*: ValidatorQueryKind
of ValidatorQueryKind.Index:
@ -515,6 +518,17 @@ type
timestamp3*: uint64
delay*: uint64
RestBeaconCommitteeSelection* = object
validator_index*: RestValidatorIndex
slot*: Slot
selection_proof*: ValidatorSig
RestSyncCommitteeSelection* = object
validator_index*: RestValidatorIndex
slot*: Slot
subcommittee_index*: uint64
selection_proof*: ValidatorSig
# Types based on the OAPI yaml file - used in responses to requests
GetBeaconHeadResponse* = DataEnclosedObject[Slot]
GetAggregatedAttestationResponse* = DataEnclosedObject[Attestation]
@ -560,6 +574,8 @@ type
SubmitBlindedBlockResponseDeneb* = DataEnclosedObject[deneb_mev.ExecutionPayloadAndBlobsBundle]
GetValidatorsActivityResponse* = DataEnclosedObject[seq[RestActivityItem]]
GetValidatorsLivenessResponse* = DataEnclosedObject[seq[RestLivenessItem]]
SubmitBeaconCommitteeSelectionsResponse* = DataEnclosedObject[seq[RestBeaconCommitteeSelection]]
SubmitSyncCommitteeSelectionsResponse* = DataEnclosedObject[seq[RestSyncCommitteeSelection]]
RestNodeValidity* {.pure.} = enum
valid = "VALID",
@ -904,3 +920,23 @@ func init*(t: typedesc[RestErrorMessage], code: HttpCode,
message: string, stacktrace: openArray[string]): RestErrorMessage =
RestErrorMessage(code: code.toInt(), message: message,
stacktraces: Opt.some(@stacktrace))
func toValidatorIndex*(value: RestValidatorIndex): Result[ValidatorIndex,
ValidatorIndexError] =
when sizeof(ValidatorIndex) == 4:
if uint64(value) < VALIDATOR_REGISTRY_LIMIT:
# On x86 platform Nim allows only `int32` indexes, so all the indexes in
# range `2^31 <= x < 2^32` are not supported.
if uint64(value) <= uint64(high(int32)):
ok(ValidatorIndex(value))
else:
err(ValidatorIndexError.UnsupportedValue)
else:
err(ValidatorIndexError.TooHighValue)
elif sizeof(ValidatorIndex) == 8:
if uint64(value) < VALIDATOR_REGISTRY_LIMIT:
ok(ValidatorIndex(value))
else:
err(ValidatorIndexError.TooHighValue)
else:
doAssert(false, "ValidatorIndex type size is incorrect")

View File

@ -167,4 +167,18 @@ proc getValidatorsLiveness*(epoch: Epoch,
): RestPlainResponse {.
rest, endpoint: "/eth/v1/validator/liveness/{epoch}",
meth: MethodPost.}
## https://ethereum.github.io/beacon-APIs/#/Validator/getLiveness
proc submitBeaconCommitteeSelectionsPlain*(
body: seq[RestBeaconCommitteeSelection]
): RestPlainResponse {.
rest, endpoint: "/eth/v1/validator/beacon_committee_selections",
meth: MethodPost.}
## https://ethereum.github.io/beacon-APIs/#/Validator/submitBeaconCommitteeSelections
proc submitSyncCommitteeSelectionsPlain*(
body: seq[RestSyncCommitteeSelection]
): RestPlainResponse {.
rest, endpoint: "/eth/v1/validator/sync_committee_selections",
meth: MethodPost.}
## https://ethereum.github.io/beacon-APIs/#/Validator/submitSyncCommitteeSelections

View File

@ -22,6 +22,8 @@ const
ResponseNoSyncError = "Received nosync error response"
ResponseDecodeError = "Received response could not be decoded"
ResponseECNotInSyncError* = "Execution client not in sync"
ResponseNotImplementedError =
"Received endpoint not implemented error response"
type
ApiResponse*[T] = Result[T, string]
@ -772,6 +774,12 @@ template handle500(): untyped {.dirty.} =
node.updateStatus(RestBeaconNodeStatus.InternalError, failure)
failures.add(failure)
template handle501(): untyped {.dirty.} =
let failure = ApiNodeFailure.init(ApiFailure.NotImplemented, RequestName,
strategy, node, response.status, response.getErrorMessage())
node.updateStatus(RestBeaconNodeStatus.Incompatible, failure)
failures.add(failure)
template handle503(): untyped {.dirty.} =
let failure = ApiNodeFailure.init(ApiFailure.NotSynced, RequestName,
strategy, node, response.status, response.getErrorMessage())
@ -2659,3 +2667,195 @@ proc getFinalizedBlockHeader*(
return Opt.some(oldestBlockHeader)
else:
return Opt.none(GetBlockHeaderResponse)
proc submitBeaconCommitteeSelections*(
vc: ValidatorClientRef,
data: seq[RestBeaconCommitteeSelection],
strategy: ApiStrategyKind
): Future[SubmitBeaconCommitteeSelectionsResponse] {.async.} =
const
RequestName = "submitBeaconCommitteeSelections"
var failures: seq[ApiNodeFailure]
case strategy
of ApiStrategyKind.First, ApiStrategyKind.Best:
let res = vc.firstSuccessParallel(
RestPlainResponse,
SubmitBeaconCommitteeSelectionsResponse,
SlotDuration,
ViableNodeStatus,
{BeaconNodeRole.Duties},
submitBeaconCommitteeSelectionsPlain(it, data)):
if apiResponse.isErr():
handleCommunicationError()
ApiResponse[SubmitBeaconCommitteeSelectionsResponse].err(
apiResponse.error)
else:
let response = apiResponse.get()
case response.status
of 200:
let res = decodeBytes(SubmitBeaconCommitteeSelectionsResponse,
response.data, response.contentType)
if res.isErr():
handleUnexpectedData()
ApiResponse[SubmitBeaconCommitteeSelectionsResponse].err($res.error)
else:
ApiResponse[SubmitBeaconCommitteeSelectionsResponse].ok(res.get())
of 400:
handle400()
ApiResponse[SubmitBeaconCommitteeSelectionsResponse].err(
ResponseInvalidError)
of 500:
handle500()
ApiResponse[SubmitBeaconCommitteeSelectionsResponse].err(
ResponseInternalError)
of 501:
handle501()
ApiResponse[SubmitBeaconCommitteeSelectionsResponse].err(
ResponseNotImplementedError)
of 503:
handle503()
ApiResponse[SubmitBeaconCommitteeSelectionsResponse].err(
ResponseNoSyncError)
else:
handleUnexpectedCode()
ApiResponse[SubmitBeaconCommitteeSelectionsResponse].err(
ResponseUnexpectedError)
if res.isErr():
raise (ref ValidatorApiError)(msg: res.error, data: failures)
return res.get()
of ApiStrategyKind.Priority:
vc.firstSuccessSequential(RestPlainResponse,
SlotDuration,
ViableNodeStatus,
{BeaconNodeRole.Duties},
submitBeaconCommitteeSelectionsPlain(it, data)):
if apiResponse.isErr():
handleCommunicationError()
false
else:
let response = apiResponse.get()
case response.status
of 200:
let res = decodeBytes(SubmitBeaconCommitteeSelectionsResponse,
response.data, response.contentType)
if res.isOk(): return res.get()
handleUnexpectedData()
false
of 400:
handle400()
false
of 500:
handle500()
false
of 501:
handle501()
false
of 503:
handle503()
false
else:
handleUnexpectedCode()
false
raise (ref ValidatorApiError)(
msg: "Failed to submit beacon committee selections", data: failures)
proc submitSyncCommitteeSelections*(
vc: ValidatorClientRef,
data: seq[RestSyncCommitteeSelection],
strategy: ApiStrategyKind
): Future[SubmitSyncCommitteeSelectionsResponse] {.async.} =
const
RequestName = "submitBeaconCommitteeSelections"
var failures: seq[ApiNodeFailure]
case strategy
of ApiStrategyKind.First, ApiStrategyKind.Best:
let res = vc.firstSuccessParallel(
RestPlainResponse,
SubmitSyncCommitteeSelectionsResponse,
SlotDuration,
ViableNodeStatus,
{BeaconNodeRole.Duties},
submitSyncCommitteeSelectionsPlain(it, data)):
if apiResponse.isErr():
handleCommunicationError()
ApiResponse[SubmitSyncCommitteeSelectionsResponse].err(
apiResponse.error)
else:
let response = apiResponse.get()
case response.status
of 200:
let res = decodeBytes(SubmitSyncCommitteeSelectionsResponse,
response.data, response.contentType)
if res.isErr():
handleUnexpectedData()
ApiResponse[SubmitSyncCommitteeSelectionsResponse].err($res.error)
else:
ApiResponse[SubmitSyncCommitteeSelectionsResponse].ok(res.get())
of 400:
handle400()
ApiResponse[SubmitSyncCommitteeSelectionsResponse].err(
ResponseInvalidError)
of 500:
handle500()
ApiResponse[SubmitSyncCommitteeSelectionsResponse].err(
ResponseInternalError)
of 501:
handle501()
ApiResponse[SubmitSyncCommitteeSelectionsResponse].err(
ResponseNotImplementedError)
of 503:
handle503()
ApiResponse[SubmitSyncCommitteeSelectionsResponse].err(
ResponseNoSyncError)
else:
handleUnexpectedCode()
ApiResponse[SubmitSyncCommitteeSelectionsResponse].err(
ResponseUnexpectedError)
if res.isErr():
raise (ref ValidatorApiError)(msg: res.error, data: failures)
return res.get()
of ApiStrategyKind.Priority:
vc.firstSuccessSequential(RestPlainResponse,
SlotDuration,
ViableNodeStatus,
{BeaconNodeRole.Duties},
submitSyncCommitteeSelectionsPlain(it, data)):
if apiResponse.isErr():
handleCommunicationError()
false
else:
let response = apiResponse.get()
case response.status
of 200:
let res = decodeBytes(SubmitSyncCommitteeSelectionsResponse,
response.data, response.contentType)
if res.isOk(): return res.get()
handleUnexpectedData()
false
of 400:
handle400()
false
of 500:
handle500()
false
of 501:
handle501()
false
of 503:
handle503()
false
else:
handleUnexpectedCode()
false
raise (ref ValidatorApiError)(
msg: "Failed to submit sync committee selections", data: failures)

View File

@ -239,7 +239,7 @@ type
ApiFailure* {.pure.} = enum
Communication, Invalid, NotFound, OptSynced, NotSynced, Internal,
UnexpectedCode, UnexpectedResponse, NoError
NotImplemented, UnexpectedCode, UnexpectedResponse, NoError
ApiNodeFailure* = object
node*: BeaconNodeServerRef
@ -255,22 +255,6 @@ type
ValidatorApiError* = object of ValidatorClientError
data*: seq[ApiNodeFailure]
FillSignaturesResult* = object
signaturesRequested*: int
signaturesReceived*: int
AttestationSlotRequest* = object
validator*: AttachedValidator
fork*: Fork
slot*: Slot
SyncCommitteeSlotRequest* = object
validator*: AttachedValidator
fork*: Fork
slot*: Slot
sync_committee_index*: IndexInSyncCommittee
duty*: SyncCommitteeDuty
const
DefaultDutyAndProof* = DutyAndProof(epoch: FAR_FUTURE_EPOCH)
DefaultSyncCommitteeDuty* = SyncCommitteeDuty()
@ -386,6 +370,7 @@ proc `$`*(failure: ApiFailure): string =
of ApiFailure.NotSynced: "not-synced"
of ApiFailure.OptSynced: "opt-synced"
of ApiFailure.Internal: "internal-issue"
of ApiFailure.NotImplemented: "not-implemented"
of ApiFailure.UnexpectedCode: "unexpected-code"
of ApiFailure.UnexpectedResponse: "unexpected-data"
of ApiFailure.NoError: "status-update"
@ -1465,240 +1450,6 @@ func `==`*(a, b: SyncCommitteeDuty): bool =
compareUnsorted(a.validator_sync_committee_indices,
b.validator_sync_committee_indices)
proc cmp(x, y: AttestationSlotRequest|SyncCommitteeSlotRequest): int =
cmp(x.slot, y.slot)
func getIndex*(proof: SyncCommitteeSelectionProof,
inindex: IndexInSyncCommittee): Opt[int] =
if len(proof) == 0:
return Opt.none(int)
for index, value in proof.pairs():
if value.sync_committee_index == inindex:
return Opt.some(index)
Opt.none(int)
func hasSignature*(proof: SyncCommitteeSelectionProof,
inindex: IndexInSyncCommittee,
slot: Slot): bool =
let index = proof.getIndex(inindex).valueOr: return false
proof[index].signatures[int(slot.since_epoch_start())].isSome()
proc setSignature*(proof: var SyncCommitteeSelectionProof,
inindex: IndexInSyncCommittee, slot: Slot,
signature: Opt[ValidatorSig]) =
let index = proof.getIndex(inindex).expect(
"EpochSelectionProof should be present at this moment")
proof[index].signatures[int(slot.since_epoch_start())] = signature
proc setSyncSelectionProof*(vc: ValidatorClientRef, pubkey: ValidatorPubKey,
inindex: IndexInSyncCommittee, slot: Slot,
duty: SyncCommitteeDuty,
signature: Opt[ValidatorSig]) =
let
proof =
block:
let length = len(duty.validator_sync_committee_indices)
var res = newSeq[EpochSelectionProof](length)
for i in 0 ..< length:
res[i].sync_committee_index = duty.validator_sync_committee_indices[i]
res
vc.syncCommitteeProofs.
mgetOrPut(slot.epoch(), default(SyncCommitteeProofs)).proofs.
mgetOrPut(pubkey, proof).setSignature(inindex, slot, signature)
proc getSyncCommitteeSelectionProof*(
vc: ValidatorClientRef,
pubkey: ValidatorPubKey,
epoch: Epoch
): Opt[SyncCommitteeSelectionProof] =
vc.syncCommitteeProofs.withValue(epoch, epochProofs):
epochProofs[].proofs.withValue(pubkey, validatorProofs):
return Opt.some(validatorProofs[])
do:
return Opt.none(SyncCommitteeSelectionProof)
do:
return Opt.none(SyncCommitteeSelectionProof)
proc getSyncCommitteeSelectionProof*(
vc: ValidatorClientRef,
pubkey: ValidatorPubKey,
slot: Slot,
inindex: IndexInSyncCommittee
): Opt[ValidatorSig] =
vc.syncCommitteeProofs.withValue(slot.epoch(), epochProofs):
epochProofs[].proofs.withValue(pubkey, validatorProofs):
let index = getIndex(validatorProofs[], inindex).valueOr:
return Opt.none(ValidatorSig)
return validatorProofs[][index].signatures[int(slot.since_epoch_start())]
do:
return Opt.none(ValidatorSig)
do:
return Opt.none(ValidatorSig)
proc fillSyncCommitteeSelectionProofs*(
service: DutiesServiceRef,
start, finish: Slot
): Future[FillSignaturesResult] {.async.} =
let
vc = service.client
genesisRoot = vc.beaconGenesis.genesis_validators_root
var
requests =
block:
var res: seq[SyncCommitteeSlotRequest]
for epoch in start.epoch() .. finish.epoch():
let
fork = vc.forkAtEpoch(epoch)
period = epoch.sync_committee_period()
for duty in vc.syncDutiesForPeriod(period):
let validator = vc.attachedValidators[].
getValidator(duty.pubkey).valueOr:
# Ignore all the validators which are not here anymore
continue
if validator.index.isNone():
# Ignore all the valididators which do not have index yet.
continue
let proof = vc.getSyncCommitteeSelectionProof(duty.pubkey, epoch).
get(default(SyncCommitteeSelectionProof))
for inindex in duty.validator_sync_committee_indices:
for slot in epoch.slots():
if slot < start: continue
if slot > finish: break
if not(proof.hasSignature(inindex, slot)):
res.add(
SyncCommitteeSlotRequest(
validator: validator,
fork: fork,
slot: slot,
duty: duty,
sync_committee_index: inindex))
# We make requests sorted by slot number.
sorted(res, cmp, order = SortOrder.Ascending)
sigres = FillSignaturesResult(signaturesRequested: len(requests))
pendingRequests = requests.mapIt(
FutureBase(getSyncCommitteeSelectionProof(
it.validator, it.fork, genesisRoot, it.slot,
getSubcommitteeIndex(it.sync_committee_index))))
while len(pendingRequests) > 0:
try:
discard await race(pendingRequests)
except CancelledError as exc:
let pending = pendingRequests
.filterIt(not(it.finished())).mapIt(it.cancelAndWait())
await noCancel allFutures(pending)
raise exc
(requests, pendingRequests) =
block:
var
res1: seq[SyncCommitteeSlotRequest]
res2: seq[FutureBase]
for index, fut in pendingRequests.pairs():
if not(fut.finished()):
res1.add(requests[index])
res2.add(fut)
else:
let
request = requests[index]
signature =
if fut.completed():
let sres = Future[SignatureResult](fut).read()
if sres.isErr():
warn "Unable to create slot signature using remote signer",
reason = sres.error(), epoch = request.slot.epoch(),
slot = request.slot
Opt.none(ValidatorSig)
else:
inc(sigres.signaturesReceived)
Opt.some(sres.get())
else:
Opt.none(ValidatorSig)
vc.setSyncSelectionProof(request.validator.pubkey,
request.sync_committee_index,
request.slot, request.duty,
signature)
(res1, res2)
sigres
proc fillAttestationSelectionProofs*(
service: DutiesServiceRef,
start, finish: Slot
): Future[FillSignaturesResult] {.async.} =
let
vc = service.client
genesisRoot = vc.beaconGenesis.genesis_validators_root
var
requests =
block:
var res: seq[AttestationSlotRequest]
for epoch in start.epoch() .. finish.epoch():
for duty in vc.attesterDutiesForEpoch(epoch):
if (duty.data.slot < start) or (duty.data.slot > finish):
# Ignore all the slots which are not in range.
continue
if duty.slotSig.isSome():
# Ignore all the duties which already has selection proof.
continue
let validator = vc.attachedValidators[].
getValidator(duty.data.pubkey).valueOr:
# Ignore all the validators which are not here anymore
continue
if validator.index.isNone():
# Ignore all the valididators which do not have index yet.
continue
res.add(AttestationSlotRequest(
validator: validator,
slot: duty.data.slot,
fork: vc.forkAtEpoch(duty.data.slot.epoch())
))
# We make requests sorted by slot number.
sorted(res, cmp, order = SortOrder.Ascending)
sigres = FillSignaturesResult(signaturesRequested: len(requests))
pendingRequests = requests.mapIt(
FutureBase(getSlotSignature(it.validator, it.fork, genesisRoot, it.slot)))
while len(pendingRequests) > 0:
try:
discard await race(pendingRequests)
except CancelledError as exc:
let pending = pendingRequests
.filterIt(not(it.finished())).mapIt(it.cancelAndWait())
await noCancel allFutures(pending)
raise exc
(requests, pendingRequests) =
block:
var
res1: seq[AttestationSlotRequest]
res2: seq[FutureBase]
for index, fut in pendingRequests.pairs():
if not(fut.finished()):
res1.add(requests[index])
res2.add(fut)
else:
let
request = requests[index]
signature =
if fut.completed():
let sres = Future[SignatureResult](fut).read()
if sres.isErr():
warn "Unable to create slot signature using remote signer",
reason = sres.error(), epoch = request.slot.epoch(),
slot = request.slot
Opt.none(ValidatorSig)
else:
inc(sigres.signaturesReceived)
Opt.some(sres.get())
else:
Opt.none(ValidatorSig)
vc.attesters.withValue(request.validator.pubkey, map):
map[].duties.withValue(request.slot.epoch(), dap):
dap[].slotSig = signature
(res1, res2)
sigres
proc updateRuntimeConfig*(vc: ValidatorClientRef,
node: BeaconNodeServerRef,
info: VCRuntimeConfig): Result[void, string] =

View File

@ -6,8 +6,8 @@
# at your option. This file may not be copied, modified, or distributed except according to those terms.
import std/[sets, sequtils]
import chronicles
import common, api, block_service
import chronicles, metrics
import common, api, block_service, selection_proofs
const
ServiceName = "duties_service"
@ -314,12 +314,23 @@ proc pollForAttesterDuties*(service: DutiesServiceRef) {.async.} =
block:
let
moment = Moment.now()
sigres = await service.fillAttestationSelectionProofs(
currentSlot, currentSlot + Epoch(1))
sigres =
await vc.fillAttestationSelectionProofs(currentSlot,
currentSlot + Epoch(AGGREGATION_PRE_COMPUTE_EPOCHS))
if vc.config.distributedEnabled:
debug "Attestation selection proofs have been received",
signatures_requested = sigres.signaturesRequested,
signatures_received = sigres.signaturesReceived,
time = (Moment.now() - moment)
selections_requested = sigres.selections_requested,
selections_received = sigres.selections_received,
selections_processed = sigres.selections_processed,
total_elapsed_time = (Moment.now() - moment)
else:
debug "Attestation selection proofs have been received",
signatures_requested = sigres.signaturesRequested,
signatures_received = sigres.signaturesReceived,
total_elapsed_time = (Moment.now() - moment)
let subscriptions =
block:
@ -393,12 +404,23 @@ proc pollForSyncCommitteeDuties*(service: DutiesServiceRef) {.async.} =
block:
let
moment = Moment.now()
sigres = await service.fillSyncCommitteeSelectionProofs(
currentSlot, currentSlot + Epoch(AGGREGATION_PRE_COMPUTE_EPOCHS))
sigres =
await vc.fillSyncCommitteeSelectionProofs(currentSlot,
currentSlot + Epoch(AGGREGATION_PRE_COMPUTE_EPOCHS))
if vc.config.distributedEnabled:
debug "Sync committee selection proofs have been received",
signatures_requested = sigres.signaturesRequested,
signatures_received = sigres.signaturesReceived,
time = (Moment.now() - moment)
selections_requested = sigres.selections_requested,
selections_received = sigres.selections_received,
selections_processed = sigres.selections_processed,
total_elapsed_time = (Moment.now() - moment)
else:
debug "Sync committee selection proofs have been received",
signatures_requested = sigres.signaturesRequested,
signatures_received = sigres.signaturesReceived,
total_elapsed_time = (Moment.now() - moment)
let
periods =

View File

@ -0,0 +1,514 @@
# beacon_chain
# Copyright (c) 2023 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.
import std/[algorithm, sequtils]
import chronicles, chronos, metrics
import common, api
{.push raises: [].}
declareGauge client_slot_signatures_time,
"Time used to obtain slot signatures"
declareGauge client_sync_committee_selection_proof_time,
"Time used to obtain sync committee selection proofs"
declareGauge client_obol_aggregated_slot_signatures_time,
"Time used to obtain slot signatures"
declareGauge client_obol_aggregated_sync_committee_selection_proof_time,
"Time used to obtain sync committee selection proofs"
type
FillSignaturesResult* = object
signaturesRequested*: int
signaturesReceived*: int
selectionsRequested*: int
selectionsReceived*: int
selectionsProcessed*: int
AttestationSlotRequest = object
validator: AttachedValidator
fork: Fork
slot: Slot
proof: Opt[ValidatorSig]
future: FutureBase
SyncCommitteeSlotRequest* = object
validator: AttachedValidator
fork: Fork
slot: Slot
sync_committee_index: IndexInSyncCommittee
sub_committee_index: SyncSubcommitteeIndex
duty: SyncCommitteeDuty
proof: Opt[ValidatorSig]
future: FutureBase
template withTimeMetric(metricName, body: untyped): untyped =
let momentTime = Moment.now()
try:
body
finally:
let elapsedTime = Moment.now() - momentTime
metrics.set(metricName, elapsedTime.milliseconds())
proc cmp(x, y: AttestationSlotRequest|SyncCommitteeSlotRequest): int =
cmp(x.slot, y.slot)
proc getAttesterDutiesRequests(
vc: ValidatorClientRef,
start, finish: Slot,
genesisRoot: Eth2Digest
): seq[AttestationSlotRequest] =
var res: seq[AttestationSlotRequest]
for epoch in start.epoch() .. finish.epoch():
for duty in vc.attesterDutiesForEpoch(epoch):
if (duty.data.slot < start) or (duty.data.slot > finish):
# Ignore all the slots which are not in range.
continue
if duty.slotSig.isSome():
# Ignore all the duties which already has selection proof.
continue
let validator = vc.attachedValidators[].
getValidator(duty.data.pubkey).valueOr:
# Ignore all the validators which are not here anymore
continue
if validator.index.isNone():
# Ignore all the validators which do not have index yet.
continue
let
fork = vc.forkAtEpoch(duty.data.slot.epoch())
future = getSlotSignature(validator, fork, genesisRoot, duty.data.slot)
res.add(
AttestationSlotRequest(validator: validator, slot: duty.data.slot,
fork: fork, future: FutureBase(future)))
# We make requests sorted by slot number.
sorted(res, cmp, order = SortOrder.Ascending)
proc fillAttestationSelectionProofs*(
vc: ValidatorClientRef,
start, finish: Slot
): Future[FillSignaturesResult] {.async.} =
let genesisRoot = vc.beaconGenesis.genesis_validators_root
var
requests: seq[AttestationSlotRequest]
sigres: FillSignaturesResult
withTimeMetric(client_slot_signatures_time):
requests = vc.getAttesterDutiesRequests(start, finish, genesisRoot)
sigres.signaturesRequested = len(requests)
var pendingRequests = requests.mapIt(it.future)
while len(pendingRequests) > 0:
try:
discard await race(pendingRequests)
except CancelledError as exc:
var pending: seq[Future[void]]
for future in pendingRequests:
if not(future.finished()): pending.add(future.cancelAndWait())
await noCancel allFutures(pending)
raise exc
pendingRequests =
block:
var res: seq[FutureBase]
for mreq in requests.mitems():
if isNil(mreq.future): continue
if not(mreq.future.finished()):
res.add(mreq.future)
else:
let signature =
if mreq.future.completed():
let sres = Future[SignatureResult](mreq.future).read()
if sres.isErr():
warn "Unable to create slot signature using remote signer",
reason = sres.error(), epoch = mreq.slot.epoch(),
slot = mreq.slot
Opt.none(ValidatorSig)
else:
inc(sigres.signaturesReceived)
Opt.some(sres.get())
else:
Opt.none(ValidatorSig)
mreq.future = nil
mreq.proof = signature
if signature.isSome():
vc.attesters.withValue(mreq.validator.pubkey, map):
map[].duties.withValue(mreq.slot.epoch(), dap):
dap[].slotSig = signature
res
if vc.config.distributedEnabled:
withTimeMetric(client_obol_aggregated_slot_signatures_time):
let (indexToKey, selections) =
block:
var
res1: Table[ValidatorIndex, Opt[ValidatorPubKey]]
res2: seq[RestBeaconCommitteeSelection]
for mreq in requests.mitems():
if mreq.proof.isSome():
res1[mreq.validator.index.get()] = Opt.some(mreq.validator.pubkey)
res2.add(RestBeaconCommitteeSelection(
validator_index: RestValidatorIndex(mreq.validator.index.get()),
slot: mreq.slot, selection_proof: mreq.proof.get()))
(res1, res2)
sigres.selectionsRequested = len(selections)
if len(selections) == 0:
return sigres
let sresponse =
try:
# Query middleware for aggregated signatures.
await vc.submitBeaconCommitteeSelections(selections,
ApiStrategyKind.Best)
except ValidatorApiError as exc:
warn "Unable to submit beacon committee selections",
reason = exc.getFailureReason()
return sigres
except CancelledError as exc:
debug "Beacon committee selections processing was interrupted"
raise exc
except CatchableError as exc:
error "Unexpected error occured while trying to submit beacon " &
"committee selections", reason = exc.msg, error = exc.name
return sigres
sigres.selectionsReceived = len(sresponse.data)
for selection in sresponse.data:
let
vindex = selection.validator_index.toValidatorIndex().valueOr:
warn "Invalid validator_index value encountered while processing " &
"beacon committee selections",
validator_index = uint64(selection.validator_index),
reason = $error
continue
selectionProof = selection.selection_proof.load().valueOr:
warn "Invalid signature encountered while processing " &
"beacon committee selections",
validator_index = vindex, slot = selection.slot,
selection_proof = shortLog(selection.selection_proof)
continue
validator =
block:
# Selections operating using validator indices, so we should check
# if we have such validator index in our validator's pool and it
# still in place (not removed using keystore manager).
let key = indexToKey.getOrDefault(vindex)
if key.isNone():
warn "Non-existing validator encountered while processing " &
"beacon committee selections",
validator_index = vindex,
slot = selection.slot,
selection_proof = shortLog(selection.selection_proof)
continue
vc.attachedValidators[].getValidator(key.get()).valueOr:
notice "Found missing validator while processing " &
"beacon committee selections", validator_index = vindex,
slot = selection.slot,
validator = shortLog(key.get()),
selection_proof = shortLog(selection.selection_proof)
continue
vc.attesters.withValue(validator.pubkey, map):
map[].duties.withValue(selection.slot.epoch(), dap):
dap[].slotSig = Opt.some(selectionProof.toValidatorSig())
inc(sigres.selectionsProcessed)
sigres
func getIndex*(proof: SyncCommitteeSelectionProof,
inindex: IndexInSyncCommittee): Opt[int] =
if len(proof) == 0:
return Opt.none(int)
for index, value in proof.pairs():
if value.sync_committee_index == inindex:
return Opt.some(index)
Opt.none(int)
func hasSignature*(proof: SyncCommitteeSelectionProof,
inindex: IndexInSyncCommittee,
slot: Slot): bool =
let index = proof.getIndex(inindex).valueOr: return false
proof[index].signatures[int(slot.since_epoch_start())].isSome()
func getSignature*(proof: SyncCommitteeSelectionProof,
inindex: IndexInSyncCommittee,
slot: Slot): Opt[ValidatorSig] =
let index = proof.getIndex(inindex).valueOr:
return Opt.none(ValidatorSig)
proof[index].signatures[int(slot.since_epoch_start())]
proc setSignature*(proof: var SyncCommitteeSelectionProof,
inindex: IndexInSyncCommittee, slot: Slot,
signature: Opt[ValidatorSig]) =
let index = proof.getIndex(inindex).expect(
"EpochSelectionProof should be present at this moment")
proof[index].signatures[int(slot.since_epoch_start())] = signature
proc setSyncSelectionProof*(vc: ValidatorClientRef, pubkey: ValidatorPubKey,
inindex: IndexInSyncCommittee, slot: Slot,
duty: SyncCommitteeDuty,
signature: Opt[ValidatorSig]) =
let
proof =
block:
let length = len(duty.validator_sync_committee_indices)
var res = newSeq[EpochSelectionProof](length)
for i in 0 ..< length:
res[i].sync_committee_index = duty.validator_sync_committee_indices[i]
res
vc.syncCommitteeProofs.
mgetOrPut(slot.epoch(), default(SyncCommitteeProofs)).proofs.
mgetOrPut(pubkey, proof).setSignature(inindex, slot, signature)
proc getSyncCommitteeSelectionProof*(
vc: ValidatorClientRef,
pubkey: ValidatorPubKey,
epoch: Epoch
): Opt[SyncCommitteeSelectionProof] =
vc.syncCommitteeProofs.withValue(epoch, epochProofs):
epochProofs[].proofs.withValue(pubkey, validatorProofs):
return Opt.some(validatorProofs[])
do:
return Opt.none(SyncCommitteeSelectionProof)
do:
return Opt.none(SyncCommitteeSelectionProof)
proc getSyncCommitteeSelectionProof*(
vc: ValidatorClientRef,
pubkey: ValidatorPubKey,
slot: Slot,
inindex: IndexInSyncCommittee
): Opt[ValidatorSig] =
vc.syncCommitteeProofs.withValue(slot.epoch(), epochProofs):
epochProofs[].proofs.withValue(pubkey, validatorProofs):
let index = getIndex(validatorProofs[], inindex).valueOr:
return Opt.none(ValidatorSig)
return validatorProofs[][index].signatures[int(slot.since_epoch_start())]
do:
return Opt.none(ValidatorSig)
do:
return Opt.none(ValidatorSig)
proc getSyncCommitteeDutiesRequests*(
vc: ValidatorClientRef,
start, finish: Slot,
genesisRoot: Eth2Digest
): seq[SyncCommitteeSlotRequest] =
var res: seq[SyncCommitteeSlotRequest]
for epoch in start.epoch() .. finish.epoch():
let
fork = vc.forkAtEpoch(epoch)
period = epoch.sync_committee_period()
for duty in vc.syncDutiesForPeriod(period):
let validator = vc.attachedValidators[].getValidator(duty.pubkey).valueOr:
# Ignore all the validators which are not here anymore
continue
if validator.index.isNone():
# Ignore all the valididators which do not have index yet.
continue
let proof = vc.getSyncCommitteeSelectionProof(duty.pubkey, epoch).
get(default(SyncCommitteeSelectionProof))
for inindex in duty.validator_sync_committee_indices:
for slot in epoch.slots():
if slot < start: continue
if slot > finish: break
if proof.hasSignature(inindex, slot): continue
let
future =
getSyncCommitteeSelectionProof(validator, fork, genesisRoot, slot,
getSubcommitteeIndex(inindex))
req =
SyncCommitteeSlotRequest(
validator: validator,
fork: fork,
slot: slot,
duty: duty,
sync_committee_index: inindex,
sub_committee_index: getSubcommitteeIndex(inindex),
future: FutureBase(future))
res.add(req)
# We make requests sorted by slot number.
sorted(res, cmp, order = SortOrder.Ascending)
proc getSyncRequest*(
requests: var openArray[SyncCommitteeSlotRequest],
validator: AttachedValidator,
slot: Slot,
subcommittee_index: uint64
): Opt[SyncCommitteeSlotRequest] =
for mreq in requests.mitems():
if mreq.validator.pubkey == validator.pubkey and
mreq.slot == slot and
mreq.sub_committee_index == subcommittee_index:
return Opt.some(mreq)
Opt.none(SyncCommitteeSlotRequest)
proc fillSyncCommitteeSelectionProofs*(
vc: ValidatorClientRef,
start, finish: Slot
): Future[FillSignaturesResult] {.async.} =
let genesisRoot = vc.beaconGenesis.genesis_validators_root
var
requests: seq[SyncCommitteeSlotRequest]
sigres: FillSignaturesResult
withTimeMetric(client_sync_committee_selection_proof_time):
requests = vc.getSyncCommitteeDutiesRequests(start, finish, genesisRoot)
sigres.signaturesRequested = len(requests)
var pendingRequests = requests.mapIt(it.future)
while len(pendingRequests) > 0:
try:
discard await race(pendingRequests)
except CancelledError as exc:
var pending: seq[Future[void]]
for future in pendingRequests:
if not(future.finished()): pending.add(future.cancelAndWait())
await noCancel allFutures(pending)
raise exc
pendingRequests =
block:
var res: seq[FutureBase]
for mreq in requests.mitems():
if isNil(mreq.future): continue
if not(mreq.future.finished()):
res.add(mreq.future)
else:
let signature =
if mreq.future.completed():
let sres = Future[SignatureResult](mreq.future).read()
if sres.isErr():
warn "Unable to create slot signature using remote signer",
reason = sres.error(), epoch = mreq.slot.epoch(),
slot = mreq.slot
Opt.none(ValidatorSig)
else:
inc(sigres.signaturesReceived)
Opt.some(sres.get())
else:
Opt.none(ValidatorSig)
mreq.future = nil
mreq.proof = signature
if signature.isSome():
vc.setSyncSelectionProof(mreq.validator.pubkey,
mreq.sync_committee_index,
mreq.slot, mreq.duty,
signature)
res
if vc.config.distributedEnabled:
withTimeMetric(client_obol_aggregated_sync_committee_selection_proof_time):
let (indexToKey, selections) =
block:
var
res1: Table[ValidatorIndex, Opt[ValidatorPubKey]]
res2: seq[RestSyncCommitteeSelection]
for mreq in requests.mitems():
if mreq.proof.isSome():
res1[mreq.validator.index.get()] = Opt.some(mreq.validator.pubkey)
res2.add(RestSyncCommitteeSelection(
validator_index: RestValidatorIndex(mreq.validator.index.get()),
subcommittee_index: uint64(mreq.sub_committee_index),
slot: mreq.slot, selection_proof: mreq.proof.get()))
(res1, res2)
sigres.selectionsRequested = len(selections)
if len(selections) == 0:
return sigres
let sresponse =
try:
# Query middleware for aggregated signatures.
await vc.submitSyncCommitteeSelections(selections,
ApiStrategyKind.Best)
except ValidatorApiError as exc:
warn "Unable to submit sync committee selections",
reason = exc.getFailureReason()
return sigres
except CancelledError as exc:
debug "Sync committee selections processing was interrupted"
raise exc
except CatchableError as exc:
error "Unexpected error occured while trying to submit sync " &
"committee selections", reason = exc.msg, error = exc.name
return sigres
sigres.selectionsReceived = len(sresponse.data)
for selection in sresponse.data:
let
slot = selection.slot
subcommittee_index = selection.subcommittee_index
vindex = selection.validator_index.toValidatorIndex().valueOr:
warn "Invalid validator_index value encountered while processing " &
"sync committee selections",
validator_index = uint64(selection.validator_index),
reason = $error
continue
selectionProof = selection.selection_proof.load().valueOr:
warn "Invalid signature encountered while processing " &
"sync committee selections",
validator_index = vindex, slot = slot,
selection_proof = shortLog(selection.selection_proof)
continue
validator =
block:
# Selections operating using validator indices, so we should check
# if we have such validator index in our validator's pool and it
# still in place (not removed using keystore manager).
let key = indexToKey.getOrDefault(vindex)
if key.isNone():
warn "Non-existing validator encountered while processing " &
"sync committee selections",
validator_index = vindex,
slot = slot,
selection_proof = shortLog(selection.selection_proof)
continue
vc.attachedValidators[].getValidator(key.get()).valueOr:
notice "Found missing validator while processing " &
"sync committee selections", validator_index = vindex,
slot = slot,
validator = shortLog(key.get()),
selection_proof = shortLog(selection.selection_proof)
continue
request =
block:
let res = getSyncRequest(requests, validator, slot,
subcommittee_index)
if res.isNone():
warn "Found sync committee selection proof which was not " &
"requested",
slot = slot, subcommittee_index = subcommittee_index,
validator = shortLog(validator),
selection_proof = shortLog(selection.selection_proof)
continue
res.get()
vc.syncCommitteeProofs.withValue(slot.epoch(), epochProofs):
epochProofs[].proofs.withValue(validator.pubkey, signatures):
signatures[].setSignature(request.sync_committee_index,
selection.slot,
Opt.some(selection.selectionProof))
inc(sigres.selectionsProcessed)
sigres

View File

@ -11,7 +11,7 @@ import
../spec/datatypes/[phase0, altair, bellatrix],
../spec/eth2_apis/rest_types,
../validators/activity_metrics,
"."/[common, api]
"."/[common, api, selection_proofs]
const
ServiceName = "sync_committee_service"

View File

@ -144,7 +144,163 @@ const
("", "err(Missing hostname)")
]
ObolBeaconRequestTestVector = """
[
{
"validator_index": "1",
"slot": "1",
"selection_proof": "0x1b66ac1fb663c9bc59509846d6ec05345bd908eda73e670af888da41af171505cc411d61252fb6cb3fa0017b679f8bb2305b26a285fa2737f175668d0dff91cc1b66ac1fb663c9bc59509846d6ec05345bd908eda73e670af888da41af171505"
},
{
"slot": "2",
"validator_index": "2",
"selection_proof": "0x2b66ac1fb663c9bc59509846d6ec05345bd908eda73e670af888da41af171505cc411d61252fb6cb3fa0017b679f8bb2305b26a285fa2737f175668d0dff91cc1b66ac1fb663c9bc59509846d6ec05345bd908eda73e670af888da41af171505"
},
{
"validator_index": "3",
"selection_proof": "0x3b66ac1fb663c9bc59509846d6ec05345bd908eda73e670af888da41af171505cc411d61252fb6cb3fa0017b679f8bb2305b26a285fa2737f175668d0dff91cc1b66ac1fb663c9bc59509846d6ec05345bd908eda73e670af888da41af171505",
"slot": "3"
},
{
"selection_proof": "0x4b66ac1fb663c9bc59509846d6ec05345bd908eda73e670af888da41af171505cc411d61252fb6cb3fa0017b679f8bb2305b26a285fa2737f175668d0dff91cc1b66ac1fb663c9bc59509846d6ec05345bd908eda73e670af888da41af171505",
"validator_index": "4",
"slot": "4"
}
]"""
ObolBeaconResponseTestVector = """
{
"data": [
{
"validator_index": "1",
"slot": "1",
"selection_proof": "0x1b66ac1fb663c9bc59509846d6ec05345bd908eda73e670af888da41af171505cc411d61252fb6cb3fa0017b679f8bb2305b26a285fa2737f175668d0dff91cc1b66ac1fb663c9bc59509846d6ec05345bd908eda73e670af888da41af171505"
},
{
"validator_index": "2",
"slot": "2",
"selection_proof": "0x2b66ac1fb663c9bc59509846d6ec05345bd908eda73e670af888da41af171505cc411d61252fb6cb3fa0017b679f8bb2305b26a285fa2737f175668d0dff91cc1b66ac1fb663c9bc59509846d6ec05345bd908eda73e670af888da41af171505"
},
{
"validator_index": "3",
"slot": "3",
"selection_proof": "0x3b66ac1fb663c9bc59509846d6ec05345bd908eda73e670af888da41af171505cc411d61252fb6cb3fa0017b679f8bb2305b26a285fa2737f175668d0dff91cc1b66ac1fb663c9bc59509846d6ec05345bd908eda73e670af888da41af171505"
},
{
"validator_index": "4",
"slot": "4",
"selection_proof": "0x4b66ac1fb663c9bc59509846d6ec05345bd908eda73e670af888da41af171505cc411d61252fb6cb3fa0017b679f8bb2305b26a285fa2737f175668d0dff91cc1b66ac1fb663c9bc59509846d6ec05345bd908eda73e670af888da41af171505"
}
]
}"""
ObolBeaconResponseTestVectorObject = [
(
validator_index: RestValidatorIndex(1),
slot: Slot(1),
selection_proof: "1b66ac1fb663c9bc59509846d6ec05345bd908eda73e670af888da41af171505cc411d61252fb6cb3fa0017b679f8bb2305b26a285fa2737f175668d0dff91cc1b66ac1fb663c9bc59509846d6ec05345bd908eda73e670af888da41af171505"
),
(
validator_index: RestValidatorIndex(2),
slot: Slot(2),
selection_proof: "2b66ac1fb663c9bc59509846d6ec05345bd908eda73e670af888da41af171505cc411d61252fb6cb3fa0017b679f8bb2305b26a285fa2737f175668d0dff91cc1b66ac1fb663c9bc59509846d6ec05345bd908eda73e670af888da41af171505"
),
(
validator_index: RestValidatorIndex(3),
slot: Slot(3),
selection_proof: "3b66ac1fb663c9bc59509846d6ec05345bd908eda73e670af888da41af171505cc411d61252fb6cb3fa0017b679f8bb2305b26a285fa2737f175668d0dff91cc1b66ac1fb663c9bc59509846d6ec05345bd908eda73e670af888da41af171505"
),
(
validator_index: RestValidatorIndex(4),
slot: Slot(4),
selection_proof: "4b66ac1fb663c9bc59509846d6ec05345bd908eda73e670af888da41af171505cc411d61252fb6cb3fa0017b679f8bb2305b26a285fa2737f175668d0dff91cc1b66ac1fb663c9bc59509846d6ec05345bd908eda73e670af888da41af171505"
)
]
ObolSyncRequestTestVector = """
[
{
"validator_index": "1",
"slot": "1",
"subcommittee_index": "1",
"selection_proof": "0x1b66ac1fb663c9bc59509846d6ec05345bd908eda73e670af888da41af171505cc411d61252fb6cb3fa0017b679f8bb2305b26a285fa2737f175668d0dff91cc1b66ac1fb663c9bc59509846d6ec05345bd908eda73e670af888da41af171505"
},
{
"validator_index": "2",
"subcommittee_index": "2",
"slot": "2",
"selection_proof": "0x2b66ac1fb663c9bc59509846d6ec05345bd908eda73e670af888da41af171505cc411d61252fb6cb3fa0017b679f8bb2305b26a285fa2737f175668d0dff91cc1b66ac1fb663c9bc59509846d6ec05345bd908eda73e670af888da41af171505"
},
{
"subcommittee_index": "3",
"validator_index": "3",
"slot": "3",
"selection_proof": "0x3b66ac1fb663c9bc59509846d6ec05345bd908eda73e670af888da41af171505cc411d61252fb6cb3fa0017b679f8bb2305b26a285fa2737f175668d0dff91cc1b66ac1fb663c9bc59509846d6ec05345bd908eda73e670af888da41af171505"
},
{
"validator_index": "4",
"slot": "4",
"selection_proof": "0x4b66ac1fb663c9bc59509846d6ec05345bd908eda73e670af888da41af171505cc411d61252fb6cb3fa0017b679f8bb2305b26a285fa2737f175668d0dff91cc1b66ac1fb663c9bc59509846d6ec05345bd908eda73e670af888da41af171505",
"subcommittee_index": "4"
}
]"""
ObolSyncResponseTestVector = """
{
"data": [
{
"validator_index": "1",
"slot": "1",
"subcommittee_index": "1",
"selection_proof": "0x1b66ac1fb663c9bc59509846d6ec05345bd908eda73e670af888da41af171505cc411d61252fb6cb3fa0017b679f8bb2305b26a285fa2737f175668d0dff91cc1b66ac1fb663c9bc59509846d6ec05345bd908eda73e670af888da41af171505"
},
{
"validator_index": "2",
"subcommittee_index": "2",
"slot": "2",
"selection_proof": "0x2b66ac1fb663c9bc59509846d6ec05345bd908eda73e670af888da41af171505cc411d61252fb6cb3fa0017b679f8bb2305b26a285fa2737f175668d0dff91cc1b66ac1fb663c9bc59509846d6ec05345bd908eda73e670af888da41af171505"
},
{
"subcommittee_index": "3",
"validator_index": "3",
"slot": "3",
"selection_proof": "0x3b66ac1fb663c9bc59509846d6ec05345bd908eda73e670af888da41af171505cc411d61252fb6cb3fa0017b679f8bb2305b26a285fa2737f175668d0dff91cc1b66ac1fb663c9bc59509846d6ec05345bd908eda73e670af888da41af171505"
},
{
"validator_index": "4",
"slot": "4",
"selection_proof": "0x4b66ac1fb663c9bc59509846d6ec05345bd908eda73e670af888da41af171505cc411d61252fb6cb3fa0017b679f8bb2305b26a285fa2737f175668d0dff91cc1b66ac1fb663c9bc59509846d6ec05345bd908eda73e670af888da41af171505",
"subcommittee_index": "4"
}
]
}"""
ObolSyncResponseTestVectorObject = [
(
validator_index: RestValidatorIndex(1),
slot: Slot(1),
subcommittee_index: 1'u64,
selection_proof: "1b66ac1fb663c9bc59509846d6ec05345bd908eda73e670af888da41af171505cc411d61252fb6cb3fa0017b679f8bb2305b26a285fa2737f175668d0dff91cc1b66ac1fb663c9bc59509846d6ec05345bd908eda73e670af888da41af171505"
),
(
validator_index: RestValidatorIndex(2),
slot: Slot(2),
subcommittee_index: 2'u64,
selection_proof: "2b66ac1fb663c9bc59509846d6ec05345bd908eda73e670af888da41af171505cc411d61252fb6cb3fa0017b679f8bb2305b26a285fa2737f175668d0dff91cc1b66ac1fb663c9bc59509846d6ec05345bd908eda73e670af888da41af171505"
),
(
validator_index: RestValidatorIndex(3),
slot: Slot(3),
subcommittee_index: 3'u64,
selection_proof: "3b66ac1fb663c9bc59509846d6ec05345bd908eda73e670af888da41af171505cc411d61252fb6cb3fa0017b679f8bb2305b26a285fa2737f175668d0dff91cc1b66ac1fb663c9bc59509846d6ec05345bd908eda73e670af888da41af171505"
),
(
validator_index: RestValidatorIndex(4),
slot: Slot(4),
subcommittee_index: 4'u64,
selection_proof: "4b66ac1fb663c9bc59509846d6ec05345bd908eda73e670af888da41af171505cc411d61252fb6cb3fa0017b679f8bb2305b26a285fa2737f175668d0dff91cc1b66ac1fb663c9bc59509846d6ec05345bd908eda73e670af888da41af171505"
)
]
type
TestDecodeTypes = seq[RestBeaconCommitteeSelection] |
seq[RestSyncCommitteeSelection]
AttestationDataTuple* = tuple[
slot: uint64,
index: uint64,
@ -238,12 +394,227 @@ proc createRootsSeen(
res
suite "Validator Client test suite":
proc decodeBytes[T: TestDecodeTypes](
t: typedesc[T],
value: openArray[byte],
contentType: Opt[ContentTypeData] = Opt.none(ContentTypeData)
): RestResult[T] =
let mediaType =
if contentType.isNone():
ApplicationJsonMediaType
else:
if isWildCard(contentType.get().mediaType):
return err("Incorrect Content-Type")
contentType.get().mediaType
if mediaType == ApplicationJsonMediaType:
try:
ok RestJson.decode(value, T,
requireAllFields = true,
allowUnknownFields = true)
except SerializationError as exc:
err("Serialization error")
else:
err("Content-Type not supported")
proc submitBeaconCommitteeSelectionsPlain(
body: seq[RestBeaconCommitteeSelection]
): RestPlainResponse {.
rest, endpoint: "/eth/v1/validator/beacon_committee_selections",
meth: MethodPost.}
## https://ethereum.github.io/beacon-APIs/#/Validator/submitBeaconCommitteeSelections
proc submitSyncCommitteeSelectionsPlain(
body: seq[RestSyncCommitteeSelection]
): RestPlainResponse {.
rest, endpoint: "/eth/v1/validator/sync_committee_selections",
meth: MethodPost.}
## https://ethereum.github.io/beacon-APIs/#/Validator/submitSyncCommitteeSelections
proc createServer(address: TransportAddress,
process: HttpProcessCallback, secure: bool): HttpServerRef =
let
socketFlags = {ServerFlags.TcpNoDelay, ServerFlags.ReuseAddr}
res = HttpServerRef.new(address, process, socketFlags = socketFlags)
res.get()
test "normalizeUri() test vectors":
for hostname in HostNames:
for vector in GoodTestVectors:
let expect = vector[1] % (hostname)
check $normalizeUri(parseUri(vector[0] % (hostname))) == expect
asyncTest "/eth/v1/validator/beacon_committee_selections " &
"serialization/deserialization test":
var clientRequest: seq[byte]
proc process(r: RequestFence): Future[HttpResponseRef] {.async.} =
if r.isOk():
let request = r.get()
case request.uri.path
of "/eth/v1/validator/beacon_committee_selections":
clientRequest = await request.getBody()
let headers = HttpTable.init([("Content-Type", "application/json")])
return await request.respond(Http200, ObolBeaconResponseTestVector,
headers)
else:
return await request.respond(Http404, "Page not found")
else:
return dumbResponse()
let server = createServer(initTAddress("127.0.0.1:0"), process, false)
server.start()
defer:
await server.stop()
await server.closeWait()
let
serverAddress = server.instance.localAddress
flags = {RestClientFlag.CommaSeparatedArray}
remoteUri = "http://" & $serverAddress
client =
block:
let res = RestClientRef.new(remoteUri, flags = flags)
check res.isOk()
res.get()
selections =
block:
let res = decodeBytes(
seq[RestBeaconCommitteeSelection],
ObolBeaconRequestTestVector.toOpenArrayByte(
0, len(ObolBeaconRequestTestVector) - 1))
check res.isOk()
res.get()
defer:
await client.closeWait()
let resp = await client.submitBeaconCommitteeSelectionsPlain(selections)
check:
resp.status == 200
resp.contentType == MediaType.init("application/json")
let request =
block:
let res = decodeBytes(
seq[RestBeaconCommitteeSelection],
clientRequest)
check res.isOk()
res.get()
let response = block:
let res = decodeBytes(SubmitBeaconCommitteeSelectionsResponse,
resp.data, resp.contentType)
check res.isOk()
res.get()
check:
len(request) == len(selections)
len(response.data) == len(ObolBeaconResponseTestVectorObject)
# Checking response
for index, item in response.data.pairs():
check:
item.validator_index ==
ObolBeaconResponseTestVectorObject[index].validator_index
item.slot ==
ObolBeaconResponseTestVectorObject[index].slot
item.selection_proof.toHex() ==
ObolBeaconResponseTestVectorObject[index].selection_proof
# Checking request
for index, item in selections.pairs():
check:
item.validator_index == request[index].validator_index
item.slot == request[index].slot
item.selection_proof.toHex() == request[index].selection_proof.toHex()
asyncTest "/eth/v1/validator/sync_committee_selections " &
"serialization/deserialization test":
var clientRequest: seq[byte]
proc process(r: RequestFence): Future[HttpResponseRef] {.async.} =
if r.isOk():
let request = r.get()
case request.uri.path
of "/eth/v1/validator/sync_committee_selections":
clientRequest = await request.getBody()
let headers = HttpTable.init([("Content-Type", "application/json")])
return await request.respond(Http200, ObolSyncResponseTestVector,
headers)
else:
return await request.respond(Http404, "Page not found")
else:
return dumbResponse()
let server = createServer(initTAddress("127.0.0.1:0"), process, false)
server.start()
defer:
await server.stop()
await server.closeWait()
let
serverAddress = server.instance.localAddress
flags = {RestClientFlag.CommaSeparatedArray}
remoteUri = "http://" & $serverAddress
client =
block:
let res = RestClientRef.new(remoteUri, flags = flags)
check res.isOk()
res.get()
selections =
block:
let res = decodeBytes(
seq[RestSyncCommitteeSelection],
ObolSyncRequestTestVector.toOpenArrayByte(
0, len(ObolSyncRequestTestVector) - 1))
check res.isOk()
res.get()
defer:
await client.closeWait()
let resp = await client.submitSyncCommitteeSelectionsPlain(selections)
check:
resp.status == 200
resp.contentType == MediaType.init("application/json")
let request =
block:
let res = decodeBytes(
seq[RestSyncCommitteeSelection],
clientRequest)
check res.isOk()
res.get()
let response = block:
let res = decodeBytes(SubmitSyncCommitteeSelectionsResponse,
resp.data, resp.contentType)
check res.isOk()
res.get()
check:
len(request) == len(selections)
len(response.data) == len(ObolSyncResponseTestVectorObject)
# Checking response
for index, item in response.data.pairs():
check:
item.validator_index ==
ObolSyncResponseTestVectorObject[index].validator_index
item.slot ==
ObolSyncResponseTestVectorObject[index].slot
item.selection_proof.toHex() ==
ObolSyncResponseTestVectorObject[index].selection_proof
item.subcommittee_index == request[index].subcommittee_index
# Checking request
for index, item in selections.pairs():
check:
item.validator_index == request[index].validator_index
item.slot == request[index].slot
item.subcommittee_index == request[index].subcommittee_index
item.selection_proof.toHex() == request[index].selection_proof.toHex()
test "getAttestationDataScore() test vectors":
for vector in AttestationDataVectors:
let