VC: roles & strategies. (#4113)

* Initial commit.

* Roles changes.

* Fix all the compilation issues.

* Add beacon node roles.
Add loop for firstSuccessParallel().

* Remove unused variables.
This commit is contained in:
Eugene Kabanov 2022-09-29 10:57:14 +03:00 committed by GitHub
parent c367b14ad9
commit eea13ee5ed
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 1743 additions and 603 deletions

View File

@ -50,6 +50,7 @@ const
defaultAdminListenAddress* = (static ValidIpAddress.init("127.0.0.1"))
defaultSigningNodeRequestTimeout* = 60
defaultBeaconNode* = "http://127.0.0.1:" & $defaultEth2RestPort
defaultBeaconNodeUri* = parseUri(defaultBeaconNode)
defaultListenAddressDesc* = $defaultListenAddress
defaultAdminListenAddressDesc* = $defaultAdminListenAddress
@ -891,9 +892,9 @@ type
beaconNodes* {.
desc: "URL addresses to one or more beacon node HTTP REST APIs",
defaultValue: @[defaultBeaconNode]
defaultValueDesc: $defaultBeaconNodeDesc
name: "beacon-node" .}: seq[string]
defaultValue: @[defaultBeaconNodeUri]
defaultValueDesc: $defaultBeaconNodeUri
name: "beacon-node" .}: seq[Uri]
SigningNodeConf* = object
configFile* {.

View File

@ -181,14 +181,19 @@ proc new*(T: type ValidatorClientRef,
let beaconNodes =
block:
var servers: seq[BeaconNodeServerRef]
let flags = {RestClientFlag.CommaSeparatedArray}
for url in config.beaconNodes:
let res = RestClientRef.new(url, flags = flags)
for index, url in config.beaconNodes.pairs():
let res = BeaconNodeServerRef.init(url, index)
if res.isErr():
warn "Unable to resolve remote beacon node server's hostname",
url = url
warn "Unable to initialize remote beacon node",
url = $url, error = res.error()
else:
servers.add(BeaconNodeServerRef(client: res.get(), endpoint: url))
debug "Beacon node was initialized", node = res.get()
servers.add(res.get())
let missingRoles = getMissingRoles(servers)
if len(missingRoles) != 0:
fatal "Beacon nodes do not use all required roles",
missing_roles = $missingRoles, nodes_count = len(servers)
quit 1
servers
if len(beaconNodes) == 0:

File diff suppressed because it is too large Load Diff

View File

@ -89,7 +89,7 @@ proc serveAttestation(service: AttestationServiceRef, adata: AttestationData,
let res =
try:
await vc.submitPoolAttestations(@[attestation])
await vc.submitPoolAttestations(@[attestation], ApiStrategyKind.First)
except ValidatorApiError:
error "Unable to publish attestation",
attestation = shortLog(attestation),
@ -175,7 +175,7 @@ proc serveAggregateAndProof*(service: AttestationServiceRef,
let res =
try:
await vc.publishAggregateAndProofs(@[signedProof])
await vc.publishAggregateAndProofs(@[signedProof], ApiStrategyKind.First)
except ValidatorApiError:
error "Unable to publish aggregated attestation",
attestation = shortLog(signedProof.message.aggregate),
@ -215,7 +215,8 @@ proc produceAndPublishAttestations*(service: AttestationServiceRef,
# This call could raise ValidatorApiError, but it is handled in
# publishAttestationsAndAggregates().
let ad = await vc.produceAttestationData(slot, committee_index)
let ad = await vc.produceAttestationData(slot, committee_index,
ApiStrategyKind.Best)
let pendingAttestations =
block:
@ -298,7 +299,8 @@ proc produceAndPublishAggregates(service: AttestationServiceRef,
if len(aggregateItems) > 0:
let aggAttestation =
try:
await vc.getAggregatedAttestation(slot, attestationRoot)
await vc.getAggregatedAttestation(slot, attestationRoot,
ApiStrategyKind.Best)
except ValidatorApiError:
error "Unable to get aggregated attestation data", slot = slot,
attestation_root = shortLog(attestationRoot)

View File

@ -55,7 +55,8 @@ proc publishBlock(vc: ValidatorClientRef, currentSlot, slot: Slot,
let beaconBlock =
try:
await vc.produceBlockV2(slot, randaoReveal, graffiti)
await vc.produceBlockV2(slot, randaoReveal, graffiti,
ApiStrategyKind.Best)
except ValidatorApiError:
error "Unable to retrieve block data", slot = slot,
wall_slot = currentSlot, validator = shortLog(validator)
@ -103,7 +104,7 @@ proc publishBlock(vc: ValidatorClientRef, currentSlot, slot: Slot,
try:
let signedBlock = ForkedSignedBeaconBlock.init(beaconBlock, blockRoot,
signature)
await vc.publishBlock(signedBlock)
await vc.publishBlock(signedBlock, ApiStrategyKind.First)
except ValidatorApiError:
error "Unable to publish block",
blockRoot = shortLog(blockRoot),

View File

@ -6,7 +6,7 @@
# at your option. This file may not be copied, modified, or distributed except according to those terms.
import
std/[tables, os, sets, sequtils],
std/[tables, os, sets, sequtils, strutils, uri],
stew/[base10, results, byteutils],
bearssl/rand, chronos, presto, presto/client as presto_client,
chronicles, confutils, json_serialization/std/[options, net],
@ -93,6 +93,13 @@ type
dependentRoot*: Eth2Digest
duties*: seq[ProposerTask]
BeaconNodeRole* {.pure.} = enum
Duties,
AttestationData, AttestationPublish,
AggregatedData, AggregatedPublish,
BlockProposalData, BlockProposalPublish,
SyncCommitteeData, SyncCommitteePublish
BeaconNodeServer* = object
client*: RestClientRef
endpoint*: string
@ -101,6 +108,9 @@ type
genesis*: Option[RestGenesis]
syncInfo*: Option[RestSyncInfo]
status*: RestBeaconNodeStatus
roles*: set[BeaconNodeRole]
logIdent*: string
index*: int
EpochDuties* = object
duties*: Table[Epoch, DutyAndProof]
@ -175,17 +185,79 @@ const
SyncDutyAndProof(epoch: Epoch(0xFFFF_FFFF_FFFF_FFFF'u64))
SlotDuration* = int64(SECONDS_PER_SLOT).seconds
OneThirdDuration* = int64(SECONDS_PER_SLOT).seconds div INTERVALS_PER_SLOT
AllBeaconNodeRoles* = {
BeaconNodeRole.Duties,
BeaconNodeRole.AttestationData,
BeaconNodeRole.AttestationPublish,
BeaconNodeRole.AggregatedData,
BeaconNodeRole.AggregatedPublish,
BeaconNodeRole.BlockProposalData,
BeaconNodeRole.BlockProposalPublish,
BeaconNodeRole.SyncCommitteeData,
BeaconNodeRole.SyncCommitteePublish,
}
proc `$`*(roles: set[BeaconNodeRole]): string =
if card(roles) > 0:
if roles != AllBeaconNodeRoles:
var res: seq[string]
if BeaconNodeRole.Duties in roles:
res.add("duties")
if BeaconNodeRole.AttestationData in roles:
res.add("attestation-data")
if BeaconNodeRole.AttestationPublish in roles:
res.add("attestation-publish")
if BeaconNodeRole.AggregatedData in roles:
res.add("aggregated-data")
if BeaconNodeRole.AggregatedPublish in roles:
res.add("aggregated-publish")
if BeaconNodeRole.BlockProposalData in roles:
res.add("block-data")
if BeaconNodeRole.BlockProposalPublish in roles:
res.add("block-publish")
if BeaconNodeRole.SyncCommitteeData in roles:
res.add("sync-data")
if BeaconNodeRole.SyncCommitteePublish in roles:
res.add("sync-publish")
res.join(",")
else:
"{all}"
else:
"{}"
proc shortLog*(roles: set[BeaconNodeRole]): string =
var r = "AGBSD"
if BeaconNodeRole.AttestationData in roles:
if BeaconNodeRole.AttestationPublish in roles: r[0] = 'A' else: r[0] = 'a'
else:
if BeaconNodeRole.AttestationPublish in roles: r[0] = '+' else: r[0] = '-'
if BeaconNodeRole.AggregatedData in roles:
if BeaconNodeRole.AggregatedPublish in roles: r[1] = 'G' else: r[1] = 'g'
else:
if BeaconNodeRole.AggregatedPublish in roles: r[1] = '+' else: r[1] = '-'
if BeaconNodeRole.BlockProposalData in roles:
if BeaconNodeRole.BlockProposalPublish in roles: r[2] = 'B' else: r[2] = 'b'
else:
if BeaconNodeRole.BlockProposalPublish in roles: r[2] = '+' else: r[2] = '-'
if BeaconNodeRole.SyncCommitteeData in roles:
if BeaconNodeRole.SyncCommitteePublish in roles:
r[3] = 'S' else: r[3] = 's'
else:
if BeaconNodeRole.SyncCommitteePublish in roles:
r[3] = '+' else: r[3] = '-'
if BeaconNodeRole.Duties in roles: r[4] = 'D' else: r[4] = '-'
r
proc `$`*(bn: BeaconNodeServerRef): string =
if bn.ident.isSome():
bn.client.address.hostname & ":" &
Base10.toString(bn.client.address.port) & " [" & bn.ident.get() & "]"
bn.logIdent & "[" & bn.ident.get() & "]"
else:
bn.client.address.hostname & ":" &
Base10.toString(bn.client.address.port)
bn.logIdent
chronicles.formatIt BeaconNodeServerRef:
$it
chronicles.expandIt(BeaconNodeServerRef):
node = $it
node_index = it.index
node_roles = shortLog(it.roles)
chronicles.expandIt(RestAttesterDuty):
pubkey = shortLog(it.pubkey)
@ -214,6 +286,83 @@ proc isDefault*(sdap: SyncDutyAndProof): bool =
proc isDefault*(prd: ProposedData): bool =
prd.epoch == Epoch(0xFFFF_FFFF_FFFF_FFFF'u64)
proc parseRoles*(data: string): Result[set[BeaconNodeRole], cstring] =
var res: set[BeaconNodeRole]
if len(data) == 0:
return ok(AllBeaconNodeRoles)
let parts = data.split("roles=")
if (len(parts) != 2) or (len(parts[0]) != 0):
return err("Invalid beacon node roles string")
let sroles = parts[1].split(",")
for srole in sroles:
case toLower(strip(srole))
of "":
discard
of "all":
res.incl(AllBeaconNodeRoles)
of "attestation":
res.incl({BeaconNodeRole.AttestationData,
BeaconNodeRole.AttestationPublish})
of "block":
res.incl({BeaconNodeRole.BlockProposalData,
BeaconNodeRole.BlockProposalPublish})
of "aggregated":
res.incl({BeaconNodeRole.AggregatedData,
BeaconNodeRole.AggregatedPublish})
of "sync":
res.incl({BeaconNodeRole.SyncCommitteeData,
BeaconNodeRole.SyncCommitteePublish})
of "attestation-data":
res.incl(BeaconNodeRole.AttestationData)
of "attestation-publish":
res.incl(BeaconNodeRole.AttestationPublish)
of "aggregated-data":
res.incl(BeaconNodeRole.AggregatedData)
of "aggregated-publish":
res.incl(BeaconNodeRole.AggregatedPublish)
of "block-data":
res.incl(BeaconNodeRole.BlockProposalData)
of "block-publish":
res.incl(BeaconNodeRole.BlockProposalPublish)
of "sync-data":
res.incl(BeaconNodeRole.SyncCommitteeData)
of "sync-publish":
res.incl(BeaconNodeRole.SyncCommitteePublish)
of "duties":
res.incl(BeaconNodeRole.Duties)
else:
return err("Invalid beacon node role string found")
ok(res)
proc init*(t: typedesc[BeaconNodeServerRef], remote: Uri,
index: int): Result[BeaconNodeServerRef, string] =
doAssert(index >= 0)
let
flags = {RestClientFlag.CommaSeparatedArray}
client =
block:
let res = RestClientRef.new($remote, flags = flags)
if res.isErr(): return err($res.error())
res.get()
roles =
block:
let res = parseRoles(remote.anchor)
if res.isErr(): return err($res.error())
res.get()
let server = BeaconNodeServerRef(
client: client, endpoint: $remote, index: index, roles: roles,
logIdent: client.address.hostname & ":" &
Base10.toString(client.address.port)
)
ok(server)
proc getMissingRoles*(n: openArray[BeaconNodeServerRef]): set[BeaconNodeRole] =
var res: set[BeaconNodeRole] = AllBeaconNodeRoles
for node in n.items():
res.excl(node.roles)
res
proc init*(t: typedesc[DutyAndProof], epoch: Epoch, dependentRoot: Eth2Digest,
duty: RestAttesterDuty,
slotSig: Option[ValidatorSig]): DutyAndProof =

View File

@ -60,7 +60,7 @@ proc pollForValidatorIndices*(vc: ValidatorClientRef) {.async.} =
let res =
try:
await vc.getValidators(idents)
await vc.getValidators(idents, ApiStrategyKind.First)
except ValidatorApiError:
error "Unable to get head state's validator information"
return
@ -119,7 +119,7 @@ proc pollForAttesterDuties*(vc: ValidatorClientRef,
let res =
try:
await vc.getAttesterDuties(epoch, indices)
await vc.getAttesterDuties(epoch, indices, ApiStrategyKind.First)
except ValidatorApiError:
error "Unable to get attester duties", epoch = epoch
return 0
@ -236,7 +236,7 @@ proc pollForSyncCommitteeDuties*(vc: ValidatorClientRef,
res =
try:
await vc.getSyncCommitteeDuties(epoch, indices)
await vc.getSyncCommitteeDuties(epoch, indices, ApiStrategyKind.First)
except ValidatorApiError:
error "Unable to get sync committee duties", epoch = epoch
return 0
@ -390,7 +390,8 @@ proc pollForAttesterDuties*(vc: ValidatorClientRef) {.async.} =
res
if len(subscriptions) > 0:
let res = await vc.prepareBeaconCommitteeSubnet(subscriptions)
let res = await vc.prepareBeaconCommitteeSubnet(subscriptions,
ApiStrategyKind.First)
if not(res):
error "Failed to subscribe validators"
@ -434,7 +435,8 @@ proc pollForSyncCommitteeDuties* (vc: ValidatorClientRef) {.async.} =
res.add(sub)
res
if len(subscriptions) > 0:
let res = await vc.prepareSyncCommitteeSubnets(subscriptions)
let res = await vc.prepareSyncCommitteeSubnets(subscriptions,
ApiStrategyKind.First)
if not(res):
error "Failed to subscribe validators"
@ -457,7 +459,8 @@ proc pollForBeaconProposers*(vc: ValidatorClientRef) {.async.} =
if vc.attachedValidators[].count() != 0:
try:
let res = await vc.getProposerDuties(currentEpoch)
let res = await vc.getProposerDuties(currentEpoch,
ApiStrategyKind.First)
let
dependentRoot = res.dependent_root
duties = res.data

View File

@ -20,11 +20,21 @@ type
incompatible*: int
nosync*: int
proc onlineNodes*(vc: ValidatorClientRef): seq[BeaconNodeServerRef] =
vc.beaconNodes.filterIt(it.status == RestBeaconNodeStatus.Online)
proc onlineNodes*(vc: ValidatorClientRef,
roles: set[BeaconNodeRole] = {}): seq[BeaconNodeServerRef] =
if len(roles) == 0:
vc.beaconNodes.filterIt(it.status == RestBeaconNodeStatus.Online)
else:
vc.beaconNodes.filterIt((it.roles * roles != {}) and
(it.status == RestBeaconNodeStatus.Online))
proc onlineNodesCount*(vc: ValidatorClientRef): int =
vc.beaconNodes.countIt(it.status == RestBeaconNodeStatus.Online)
proc onlineNodesCount*(vc: ValidatorClientRef,
roles: set[BeaconNodeRole] = {}): int =
if len(roles) == 0:
vc.beaconNodes.countIt(it.status == RestBeaconNodeStatus.Online)
else:
vc.beaconNodes.countIt((it.roles * roles != {}) and
(it.status == RestBeaconNodeStatus.Online))
proc unusableNodes*(vc: ValidatorClientRef): seq[BeaconNodeServerRef] =
vc.beaconNodes.filterIt(it.status != RestBeaconNodeStatus.Online)
@ -48,11 +58,11 @@ proc getNodeCounts*(vc: ValidatorClientRef): BeaconNodesCounters =
inc(res.online)
res
proc waitOnlineNodes*(vc: ValidatorClientRef,
timeoutFut: Future[void] = nil) {.async.} =
proc waitOnlineNodes*(vc: ValidatorClientRef, timeoutFut: Future[void] = nil,
roles: set[BeaconNodeRole] = {}) {.async.} =
doAssert(not(isNil(vc.fallbackService)))
while true:
if vc.onlineNodesCount() != 0:
if vc.onlineNodesCount(roles) != 0:
break
else:
if vc.fallbackService.onlineEvent.isSet():

View File

@ -51,7 +51,7 @@ proc pollForFork(vc: ValidatorClientRef) {.async.} =
let forks =
try:
await vc.getForkSchedule()
await vc.getForkSchedule(ApiStrategyKind.Best)
except ValidatorApiError as exc:
error "Unable to retrieve fork schedule", reason = exc.msg
return

View File

@ -63,7 +63,7 @@ proc serveSyncCommitteeMessage*(service: SyncCommitteeServiceRef,
let res =
try:
await vc.submitPoolSyncCommitteeSignature(message)
await vc.submitPoolSyncCommitteeSignature(message, ApiStrategyKind.First)
except ValidatorApiError:
error "Unable to publish sync committee message",
message = shortLog(message),
@ -177,7 +177,8 @@ proc serveContributionAndProof*(service: SyncCommitteeServiceRef,
let res =
try:
await vc.publishContributionAndProofs(@[restSignedProof])
await vc.publishContributionAndProofs(@[restSignedProof],
ApiStrategyKind.First)
except ValidatorApiError as err:
error "Unable to publish sync contribution",
contribution = shortLog(proof.contribution),
@ -240,7 +241,8 @@ proc produceAndPublishContributions(service: SyncCommitteeServiceRef,
try:
await vc.produceSyncCommitteeContribution(slot,
item.subcommitteeIdx,
beaconBlockRoot)
beaconBlockRoot,
ApiStrategyKind.Best)
except ValidatorApiError:
error "Unable to get sync message contribution data", slot = slot,
beaconBlockRoot = shortLog(beaconBlockRoot)
@ -322,7 +324,7 @@ proc publishSyncMessagesAndContributions(service: SyncCommitteeServiceRef,
let beaconBlockRoot =
block:
try:
let res = await vc.getHeadBlockRoot()
let res = await vc.getHeadBlockRoot(ApiStrategyKind.First)
res.root
except ValidatorApiError as exc:
error "Unable to retrieve head block's root to sign", reason = exc.msg