Eugene Kabanov ac1b02698a
VC: Use scoring function to select best attestation data when using multiple BNs. (#5101)
* Initial commit.

* Move score selection log statement to debug level.

* Fix proper float64 log format.

* Cleanup imports and legacy code.

* Address review comments.

* Address review comments.

* Fix scoring function.

* Address review comments.

* Address review comments 2.
Fix registerBlock post-rebase issues.

* Simplify innerLoop decision making.

* Make getAttestationDataScore() more testable.
Add tests for getAttestationDataScore().

* Add modified AllTests copy.
2023-07-06 11:14:22 +02:00

1350 lines
47 KiB
Nim

# beacon_chain
# Copyright (c) 2021-2023 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.
import
std/[tables, os, sets, sequtils, strutils, uri],
stew/[base10, results, byteutils],
bearssl/rand, chronos, presto, presto/client as presto_client,
chronicles, confutils,
metrics, metrics/chronos_httpserver,
".."/spec/datatypes/[base, phase0, altair],
".."/spec/[eth2_merkleization, helpers, signatures, validator],
".."/spec/eth2_apis/[eth2_rest_serialization, rest_beacon_client,
dynamic_fee_recipients],
".."/consensus_object_pools/block_pools_types,
".."/validators/[keystore_management, validator_pool, slashing_protection,
validator_duties],
".."/[conf, beacon_clock, version, nimbus_binary_common]
from std/times import Time, toUnix, fromUnix, getTime
export
os, sets, sequtils, chronos, presto, chronicles, confutils,
nimbus_binary_common, version, conf, tables, results, base10,
byteutils, presto_client, eth2_rest_serialization, rest_beacon_client,
phase0, altair, helpers, signatures, validator, eth2_merkleization,
beacon_clock, keystore_management, slashing_protection, validator_pool,
dynamic_fee_recipients, Time, toUnix, fromUnix, getTime, block_pools_types,
base, metrics
const
SYNC_TOLERANCE* = 4'u64
SLOT_LOOKAHEAD* = 1.seconds
HISTORICAL_DUTIES_EPOCHS* = 2'u64
TIME_DELAY_FROM_SLOT* = 79.milliseconds
SUBSCRIPTION_BUFFER_SLOTS* = 2'u64
EPOCHS_BETWEEN_VALIDATOR_REGISTRATION* = 1
DelayBuckets* = [-Inf, -4.0, -2.0, -1.0, -0.5, -0.1, -0.05,
0.05, 0.1, 0.5, 1.0, 2.0, 4.0, 8.0, Inf]
ZeroTimeDiff* = TimeDiff(nanoseconds: 0'i64)
type
ServiceState* {.pure.} = enum
Initialized, Running, Error, Closing, Closed
RegistrationKind* {.pure.} = enum
Cached, IncorrectTime, MissingIndex, MissingFee, MissingGasLimit,
ErrorSignature, NoSignature
PendingValidatorRegistration* = object
registration*: SignedValidatorRegistrationV1
future*: Future[SignatureResult]
ClientServiceRef* = ref object of RootObj
name*: string
state*: ServiceState
lifeFut*: Future[void]
client*: ValidatorClientRef
DutiesServiceRef* = ref object of ClientServiceRef
FallbackServiceRef* = ref object of ClientServiceRef
changesEvent*: AsyncEvent
ForkServiceRef* = ref object of ClientServiceRef
AttestationServiceRef* = ref object of ClientServiceRef
BlockServiceRef* = ref object of ClientServiceRef
SyncCommitteeServiceRef* = ref object of ClientServiceRef
DoppelgangerServiceRef* = ref object of ClientServiceRef
enabled*: bool
DutyAndProof* = object
epoch*: Epoch
dependentRoot*: Eth2Digest
data*: RestAttesterDuty
slotSig*: Opt[ValidatorSig]
SyncCommitteeDuty* = object
pubkey*: ValidatorPubKey
validator_index*: ValidatorIndex
validator_sync_committee_index*: IndexInSyncCommittee
SyncCommitteeSubscriptionInfo* = object
validator_index*: ValidatorIndex
validator_sync_committee_indices*: seq[IndexInSyncCommittee]
ProposerTask* = object
duty*: RestProposerDuty
future*: Future[void]
ProposedData* = object
epoch*: Epoch
dependentRoot*: Eth2Digest
duties*: seq[ProposerTask]
BeaconNodeRole* {.pure.} = enum
Duties,
AttestationData, AttestationPublish,
AggregatedData, AggregatedPublish,
BlockProposalData, BlockProposalPublish,
SyncCommitteeData, SyncCommitteePublish,
NoTimeCheck
RestBeaconNodeFeature* {.pure.} = enum
NoNimbusExtensions ## BN do not supports Nimbus Extensions
TimeOffset* = object
value: int64
BeaconNodeServer* = object
client*: RestClientRef
endpoint*: string
config*: VCRuntimeConfig
ident*: Opt[string]
genesis*: Opt[RestGenesis]
syncInfo*: Opt[RestSyncInfo]
status*: RestBeaconNodeStatus
features*: set[RestBeaconNodeFeature]
roles*: set[BeaconNodeRole]
logIdent*: string
index*: int
timeOffset*: Opt[TimeOffset]
EpochDuties* = object
duties*: Table[Epoch, DutyAndProof]
EpochSyncDuties* = object
duties*: Table[Epoch, SyncCommitteeDuty]
RestBeaconNodeStatus* {.pure.} = enum
Offline, ## BN is offline.
Online, ## BN is online, passed checkOnline() check.
Incompatible, ## BN configuration is NOT compatible with VC.
Compatible, ## BN configuration is compatible with VC configuration.
NotSynced, ## BN is not in sync.
OptSynced, ## BN is optimistically synced (EL is not in sync).
Synced, ## BN and EL are synced.
UnexpectedCode, ## BN sends unexpected/incorrect HTTP status code .
UnexpectedResponse, ## BN sends unexpected/incorrect response.
BrokenClock, ## BN wall clock is broken or has significan offset.
InternalError ## BN reports internal error.
BeaconNodesCounters* = object
data*: array[int(high(RestBeaconNodeStatus)) + 1, int]
BeaconNodeServerRef* = ref BeaconNodeServer
AttesterMap* = Table[ValidatorPubKey, EpochDuties]
SyncCommitteeDutiesMap* = Table[ValidatorPubKey, EpochSyncDuties]
ProposerMap* = Table[Epoch, ProposedData]
DoppelgangerStatus* {.pure.} = enum
None, Checking, Passed
DoppelgangerAttempt* {.pure.} = enum
None, Failure, SuccessTrue, SuccessFalse
BlockWaiter* = object
future*: Future[seq[Eth2Digest]]
count*: int
BlockDataItem* = object
blocks: seq[Eth2Digest]
waiters*: seq[BlockWaiter]
ValidatorClient* = object
config*: ValidatorClientConf
metricsServer*: Opt[MetricsHttpServerRef]
graffitiBytes*: GraffitiBytes
beaconNodes*: seq[BeaconNodeServerRef]
fallbackService*: FallbackServiceRef
forkService*: ForkServiceRef
dutiesService*: DutiesServiceRef
attestationService*: AttestationServiceRef
blockService*: BlockServiceRef
syncCommitteeService*: SyncCommitteeServiceRef
doppelgangerService*: DoppelgangerServiceRef
runSlotLoopFut*: Future[void]
runKeystoreCachePruningLoopFut*: Future[void]
sigintHandleFut*: Future[void]
sigtermHandleFut*: Future[void]
keymanagerHost*: ref KeymanagerHost
keymanagerServer*: RestServerRef
keystoreCache*: KeystoreCacheRef
beaconClock*: BeaconClock
attachedValidators*: ref ValidatorPool
forks*: seq[Fork]
preGenesisEvent*: AsyncEvent
genesisEvent*: AsyncEvent
forksAvailable*: AsyncEvent
nodesAvailable*: AsyncEvent
indicesAvailable*: AsyncEvent
doppelExit*: AsyncEvent
attesters*: AttesterMap
proposers*: ProposerMap
syncCommitteeDuties*: SyncCommitteeDutiesMap
beaconGenesis*: RestGenesis
proposerTasks*: Table[Slot, seq[ProposerTask]]
dynamicFeeRecipientsStore*: ref DynamicFeeRecipientsStore
validatorsRegCache*: Table[ValidatorPubKey, SignedValidatorRegistrationV1]
blocksSeen*: Table[Slot, BlockDataItem]
rootsSeen*: Table[Eth2Digest, Slot]
processingDelay*: Opt[Duration]
rng*: ref HmacDrbgContext
ApiStrategyKind* {.pure.} = enum
Priority, Best, First
ApiFailure* {.pure.} = enum
Communication, Invalid, NotFound, OptSynced, NotSynced, Internal,
UnexpectedCode, UnexpectedResponse, NoError
ApiNodeFailure* = object
node*: BeaconNodeServerRef
request*: string
strategy*: Opt[ApiStrategyKind]
failure*: ApiFailure
status*: Opt[int]
reason*: string
ValidatorClientRef* = ref ValidatorClient
ValidatorClientError* = object of CatchableError
ValidatorApiError* = object of ValidatorClientError
data*: seq[ApiNodeFailure]
const
DefaultDutyAndProof* = DutyAndProof(epoch: FAR_FUTURE_EPOCH)
SlotDuration* = int64(SECONDS_PER_SLOT).seconds
OneThirdDuration* = int64(SECONDS_PER_SLOT).seconds div INTERVALS_PER_SLOT
AllBeaconNodeRoles* = {
BeaconNodeRole.Duties,
BeaconNodeRole.AttestationData,
BeaconNodeRole.AttestationPublish,
BeaconNodeRole.AggregatedData,
BeaconNodeRole.AggregatedPublish,
BeaconNodeRole.BlockProposalData,
BeaconNodeRole.BlockProposalPublish,
BeaconNodeRole.SyncCommitteeData,
BeaconNodeRole.SyncCommitteePublish
}
## AllBeaconNodeRoles missing BeaconNodeRole.NoTimeCheck, because timecheks
## are enabled by default.
AllBeaconNodeStatuses* = {
RestBeaconNodeStatus.Offline,
RestBeaconNodeStatus.Online,
RestBeaconNodeStatus.Incompatible,
RestBeaconNodeStatus.Compatible,
RestBeaconNodeStatus.NotSynced,
RestBeaconNodeStatus.OptSynced,
RestBeaconNodeStatus.Synced,
RestBeaconNodeStatus.UnexpectedCode,
RestBeaconNodeStatus.UnexpectedResponse,
RestBeaconNodeStatus.BrokenClock,
RestBeaconNodeStatus.InternalError
}
proc `$`*(to: TimeOffset): string =
if to.value < 0:
"-" & $chronos.nanoseconds(-to.value)
else:
$chronos.nanoseconds(to.value)
chronicles.formatIt(TimeOffset):
$it
chronicles.formatIt(Opt[TimeOffset]):
if it.isSome(): $(it.get()) else: "<unknown>"
proc `$`*(roles: set[BeaconNodeRole]): string =
if card(roles) > 0:
if roles != AllBeaconNodeRoles:
var res: seq[string]
if BeaconNodeRole.Duties in roles:
res.add("duties")
if BeaconNodeRole.AttestationData in roles:
res.add("attestation-data")
if BeaconNodeRole.AttestationPublish in roles:
res.add("attestation-publish")
if BeaconNodeRole.AggregatedData in roles:
res.add("aggregated-data")
if BeaconNodeRole.AggregatedPublish in roles:
res.add("aggregated-publish")
if BeaconNodeRole.BlockProposalData in roles:
res.add("block-data")
if BeaconNodeRole.BlockProposalPublish in roles:
res.add("block-publish")
if BeaconNodeRole.SyncCommitteeData in roles:
res.add("sync-data")
if BeaconNodeRole.SyncCommitteePublish in roles:
res.add("sync-publish")
if BeaconNodeRole.NoTimeCheck in roles:
res.add("no-timecheck")
res.join(",")
else:
"{all}"
else:
"{}"
proc `$`*(status: RestBeaconNodeStatus): string =
case status
of RestBeaconNodeStatus.Offline: "offline"
of RestBeaconNodeStatus.Online: "online"
of RestBeaconNodeStatus.Incompatible: "incompatible"
of RestBeaconNodeStatus.Compatible: "compatible"
of RestBeaconNodeStatus.NotSynced: "bn-unsynced"
of RestBeaconNodeStatus.OptSynced: "el-unsynced"
of RestBeaconNodeStatus.Synced: "synced"
of RestBeaconNodeStatus.UnexpectedCode: "unexpected code"
of RestBeaconNodeStatus.UnexpectedResponse: "unexpected data"
of RestBeaconNodeStatus.InternalError: "internal error"
of RestBeaconNodeStatus.BrokenClock: "broken clock"
proc `$`*(failure: ApiFailure): string =
case failure
of ApiFailure.Communication: "communication"
of ApiFailure.Invalid: "invalid-request"
of ApiFailure.NotFound: "not-found"
of ApiFailure.NotSynced: "not-synced"
of ApiFailure.OptSynced: "opt-synced"
of ApiFailure.Internal: "internal-issue"
of ApiFailure.UnexpectedCode: "unexpected-code"
of ApiFailure.UnexpectedResponse: "unexpected-data"
of ApiFailure.NoError: "status-update"
proc getNodeCounts*(vc: ValidatorClientRef): BeaconNodesCounters =
var res = BeaconNodesCounters()
for node in vc.beaconNodes: inc(res.data[int(node.status)])
res
proc hash*(f: ApiNodeFailure): Hash =
hash(f.failure)
proc toString*(strategy: ApiStrategyKind): string =
case strategy
of ApiStrategyKind.First:
"first"
of ApiStrategyKind.Best:
"best"
of ApiStrategyKind.Priority:
"priority"
func getFailureReason*(failure: ApiNodeFailure): string =
let status =
if failure.status.isSome():
Base10.toString(uint32(failure.status.get()))
else:
"n/a"
let request =
if failure.strategy.isSome():
failure.request & "(" & failure.strategy.get().toString() & ")"
else:
failure.request & "()"
[failure.reason, status, request, $failure.failure].join(";")
proc getFailureReason*(exc: ref ValidatorApiError): string =
var counts: array[int(high(ApiFailure)) + 1, int]
let
errors = exc[].data
errorsCount = len(errors)
if errorsCount > 1:
let distinctErrors =
block:
var res: seq[ApiNodeFailure]
for item in errors.toHashSet().items():
res.add(item)
res
if len(distinctErrors) > 1:
# If we have many unique errors, we going to report only failures,
# full reasons could be obtained via previosly made log statements.
"[" & distinctErrors.mapIt($it.failure).join(",") & "]"
else:
getFailureReason(distinctErrors[0])
elif errorsCount == 1:
getFailureReason(errors[0])
else:
exc.msg
proc shortLog*(roles: set[BeaconNodeRole]): string =
var r = "AGBSDT"
if BeaconNodeRole.AttestationData in roles:
if BeaconNodeRole.AttestationPublish in roles: r[0] = 'A' else: r[0] = 'a'
else:
if BeaconNodeRole.AttestationPublish in roles: r[0] = '+' else: r[0] = '-'
if BeaconNodeRole.AggregatedData in roles:
if BeaconNodeRole.AggregatedPublish in roles: r[1] = 'G' else: r[1] = 'g'
else:
if BeaconNodeRole.AggregatedPublish in roles: r[1] = '+' else: r[1] = '-'
if BeaconNodeRole.BlockProposalData in roles:
if BeaconNodeRole.BlockProposalPublish in roles: r[2] = 'B' else: r[2] = 'b'
else:
if BeaconNodeRole.BlockProposalPublish in roles: r[2] = '+' else: r[2] = '-'
if BeaconNodeRole.SyncCommitteeData in roles:
if BeaconNodeRole.SyncCommitteePublish in roles:
r[3] = 'S' else: r[3] = 's'
else:
if BeaconNodeRole.SyncCommitteePublish in roles:
r[3] = '+' else: r[3] = '-'
if BeaconNodeRole.Duties in roles: r[4] = 'D' else: r[4] = '-'
if BeaconNodeRole.NoTimeCheck notin roles: r[5] = 'T' else: r[5] = '-'
r
proc `$`*(bn: BeaconNodeServerRef): string =
if bn.ident.isSome():
bn.logIdent & "[" & bn.ident.get() & "]"
else:
bn.logIdent
proc validatorLog*(key: ValidatorPubKey,
index: ValidatorIndex): string =
var res = shortLog(key)
res.add('@')
res.add(Base10.toString(uint64(index)))
res
proc validatorLog*(validator: AttachedValidator): string =
var res = shortLog(validator)
res.add('@')
if validator.index.isSome():
res.add(Base10.toString(uint64(validator.index.get())))
else:
res.add("<missing>")
res
chronicles.expandIt(BeaconNodeServerRef):
node = $it
node_index = it.index
node_roles = shortLog(it.roles)
chronicles.expandIt(RestAttesterDuty):
pubkey = shortLog(it.pubkey)
slot = it.slot
validator_index = it.validator_index
committee_index = it.committee_index
committee_length = it.committee_length
committees_at_slot = it.committees_at_slot
validator_committee_index = it.validator_committee_index
chronicles.expandIt(SyncCommitteeDuty):
pubkey = shortLog(it.pubkey)
validator_index = it.validator_index
validator_sync_committee_index = it.validator_sync_committee_index
proc equals*(info: VCRuntimeConfig, name: string, check: uint64): bool =
let numstr = info.getOrDefault(name, "missing")
if numstr == "missing": return false
let value = Base10.decode(uint64, numstr).valueOr:
return false
value == check
proc equals*(info: VCRuntimeConfig, name: string, check: DomainType): bool =
let domstr = info.getOrDefault(name, "missing")
if domstr == "missing": return false
let value =
try:
var dres: DomainType
hexToByteArray(domstr, distinctBase(dres))
dres
except ValueError:
return false
value == check
proc equals*(info: VCRuntimeConfig, name: string, check: Epoch): bool =
info.equals(name, uint64(check))
proc getOrDefault*(info: VCRuntimeConfig, name: string,
default: uint64): uint64 =
let numstr = info.getOrDefault(name, "missing")
if numstr == "missing": return default
Base10.decode(uint64, numstr).valueOr:
return default
proc getOrDefault*(info: VCRuntimeConfig, name: string, default: Epoch): Epoch =
Epoch(info.getOrDefault(name, uint64(default)))
proc checkConfig*(c: VCRuntimeConfig): bool =
c.equals("MAX_VALIDATORS_PER_COMMITTEE", MAX_VALIDATORS_PER_COMMITTEE) and
c.equals("SLOTS_PER_EPOCH", SLOTS_PER_EPOCH) and
c.equals("SECONDS_PER_SLOT", SECONDS_PER_SLOT) and
c.equals("EPOCHS_PER_ETH1_VOTING_PERIOD", EPOCHS_PER_ETH1_VOTING_PERIOD) and
c.equals("SLOTS_PER_HISTORICAL_ROOT", SLOTS_PER_HISTORICAL_ROOT) and
c.equals("EPOCHS_PER_HISTORICAL_VECTOR", EPOCHS_PER_HISTORICAL_VECTOR) and
c.equals("EPOCHS_PER_SLASHINGS_VECTOR", EPOCHS_PER_SLASHINGS_VECTOR) and
c.equals("HISTORICAL_ROOTS_LIMIT", HISTORICAL_ROOTS_LIMIT) and
c.equals("VALIDATOR_REGISTRY_LIMIT", VALIDATOR_REGISTRY_LIMIT) and
c.equals("MAX_PROPOSER_SLASHINGS", MAX_PROPOSER_SLASHINGS) and
c.equals("MAX_ATTESTER_SLASHINGS", MAX_ATTESTER_SLASHINGS) and
c.equals("MAX_ATTESTATIONS", MAX_ATTESTATIONS) and
c.equals("MAX_DEPOSITS", MAX_DEPOSITS) and
c.equals("MAX_VOLUNTARY_EXITS", MAX_VOLUNTARY_EXITS) and
c.equals("DOMAIN_BEACON_PROPOSER", DOMAIN_BEACON_PROPOSER) and
c.equals("DOMAIN_BEACON_ATTESTER", DOMAIN_BEACON_ATTESTER) and
c.equals("DOMAIN_RANDAO", DOMAIN_RANDAO) and
c.equals("DOMAIN_DEPOSIT", DOMAIN_DEPOSIT) and
c.equals("DOMAIN_VOLUNTARY_EXIT", DOMAIN_VOLUNTARY_EXIT) and
c.equals("DOMAIN_SELECTION_PROOF", DOMAIN_SELECTION_PROOF) and
c.equals("DOMAIN_AGGREGATE_AND_PROOF", DOMAIN_AGGREGATE_AND_PROOF) and
c.hasKey("ALTAIR_FORK_VERSION") and c.hasKey("ALTAIR_FORK_EPOCH") and
not(c.equals("ALTAIR_FORK_EPOCH", FAR_FUTURE_EPOCH))
proc updateStatus*(node: BeaconNodeServerRef,
status: RestBeaconNodeStatus,
failure: ApiNodeFailure) =
logScope:
node = node
case status
of RestBeaconNodeStatus.Offline:
if node.status != status:
warn "Beacon node down",
reason = failure.getFailureReason()
node.status = status
of RestBeaconNodeStatus.Online:
if node.status != status:
let version = if node.ident.isSome(): node.ident.get() else: "<missing>"
notice "Beacon node is online", agent_version = version
node.status = status
of RestBeaconNodeStatus.Incompatible:
if node.status != status:
warn "Beacon node has incompatible configuration",
reason = failure.getFailureReason()
node.status = status
of RestBeaconNodeStatus.Compatible:
if node.status != status:
notice "Beacon node is compatible"
node.status = status
of RestBeaconNodeStatus.NotSynced:
if node.status notin {RestBeaconNodeStatus.NotSynced,
RestBeaconNodeStatus.OptSynced}:
doAssert(node.syncInfo.isSome())
let si = node.syncInfo.get()
warn "Beacon node not in sync", reason = failure.getFailureReason(),
last_head_slot = si.head_slot,
last_sync_distance = si.sync_distance,
last_optimistic = si.is_optimistic.get(false)
node.status = status
of RestBeaconNodeStatus.OptSynced:
if node.status != status:
doAssert(node.syncInfo.isSome())
let si = node.syncInfo.get()
notice "Beacon node optimistically synced (Execution client not in sync)",
reason = failure.getFailureReason(),
last_head_slot = si.head_slot,
last_sync_distance = si.sync_distance,
last_optimistic = si.is_optimistic.get(false)
node.status = status
of RestBeaconNodeStatus.Synced:
if node.status != status:
doAssert(node.syncInfo.isSome())
let si = node.syncInfo.get()
notice "Beacon node is in sync",
head_slot = si.head_slot,
sync_distance = si.sync_distance,
is_optimistic = si.is_optimistic.get(false)
node.status = status
of RestBeaconNodeStatus.UnexpectedResponse:
if node.status != status:
error "Beacon node provides unexpected response",
reason = failure.getFailureReason()
node.status = status
of RestBeaconNodeStatus.UnexpectedCode:
if node.status != status:
error "Beacon node provides unexpected status code",
reason = failure.getFailureReason()
node.status = status
of RestBeaconNodeStatus.InternalError:
if node.status != status:
warn "Beacon node reports internal error",
reason = failure.getFailureReason()
node.status = status
of RestBeaconNodeStatus.BrokenClock:
if node.status != status:
warn "Beacon node's clock is out of order, (beacon node is unusable)"
node.status = status
proc stop*(csr: ClientServiceRef) {.async.} =
debug "Stopping service", service = csr.name
if csr.state == ServiceState.Running:
csr.state = ServiceState.Closing
if not(csr.lifeFut.finished()):
await csr.lifeFut.cancelAndWait()
csr.state = ServiceState.Closed
debug "Service stopped", service = csr.name
proc isDefault*(dap: DutyAndProof): bool =
dap.epoch == Epoch(0xFFFF_FFFF_FFFF_FFFF'u64)
proc isDefault*(prd: ProposedData): bool =
prd.epoch == Epoch(0xFFFF_FFFF_FFFF_FFFF'u64)
proc parseRoles*(data: string): Result[set[BeaconNodeRole], cstring] =
var res: set[BeaconNodeRole]
if len(data) == 0:
return ok(AllBeaconNodeRoles)
let parts = data.split("roles=")
if (len(parts) != 2) or (len(parts[0]) != 0):
return err("Invalid beacon node roles string")
let sroles = parts[1].split(",")
for srole in sroles:
case toLower(strip(srole))
of "":
discard
of "all":
res.incl(AllBeaconNodeRoles)
of "attestation":
res.incl({BeaconNodeRole.AttestationData,
BeaconNodeRole.AttestationPublish})
of "block":
res.incl({BeaconNodeRole.BlockProposalData,
BeaconNodeRole.BlockProposalPublish})
of "aggregated":
res.incl({BeaconNodeRole.AggregatedData,
BeaconNodeRole.AggregatedPublish})
of "sync":
res.incl({BeaconNodeRole.SyncCommitteeData,
BeaconNodeRole.SyncCommitteePublish})
of "attestation-data":
res.incl(BeaconNodeRole.AttestationData)
of "attestation-publish":
res.incl(BeaconNodeRole.AttestationPublish)
of "aggregated-data":
res.incl(BeaconNodeRole.AggregatedData)
of "aggregated-publish":
res.incl(BeaconNodeRole.AggregatedPublish)
of "block-data":
res.incl(BeaconNodeRole.BlockProposalData)
of "block-publish":
res.incl(BeaconNodeRole.BlockProposalPublish)
of "sync-data":
res.incl(BeaconNodeRole.SyncCommitteeData)
of "sync-publish":
res.incl(BeaconNodeRole.SyncCommitteePublish)
of "duties":
res.incl(BeaconNodeRole.Duties)
of "no-timecheck":
res.incl(BeaconNodeRole.NoTimeCheck)
else:
return err("Invalid beacon node role string found")
if res == {BeaconNodeRole.NoTimeCheck}:
res.incl(AllBeaconNodeRoles)
ok(res)
proc normalizeUri*(r: Uri): Result[Uri, cstring] =
const
MissingPortNumber = cstring("Missing port number")
MissingHostname = cstring("Missing hostname")
UnknownScheme = cstring("Unknown scheme value")
if ($r).toLowerAscii().startsWith("http://") or
($r).toLowerAscii().startsWith("https://"):
# When a scheme is provided, only a hostname is required
if len(r.hostname) == 0: return err(MissingHostname)
return ok(r)
# Check for unknown scheme
if ($r).contains("://"):
return err(UnknownScheme)
# Add the default scheme (http)
let normalized =
if ($r).startsWith("//"):
parseUri("http:" & $r)
else:
parseUri("http://" & $r)
if len(normalized.hostname) == 0:
return err(MissingHostname)
if len(normalized.port) == 0:
return err(MissingPortNumber)
ok(normalized)
proc init*(t: typedesc[BeaconNodeServerRef], remote: Uri,
index: int): Result[BeaconNodeServerRef, string] =
doAssert(index >= 0)
let
flags = {RestClientFlag.CommaSeparatedArray}
socketFlags = {SocketFlags.TcpNoDelay}
remoteUri = normalizeUri(remote).valueOr:
return err($error)
client = RestClientRef.new($remoteUri, flags = flags,
socketFlags = socketFlags).valueOr:
return err($error)
roles = parseRoles(remoteUri.anchor).valueOr:
return err($error)
let server = BeaconNodeServerRef(
client: client, endpoint: $remoteUri, index: index, roles: roles,
logIdent: $client.address.getUri(),
status: RestBeaconNodeStatus.Offline
)
ok(server)
proc getMissingRoles*(n: openArray[BeaconNodeServerRef]): set[BeaconNodeRole] =
var res: set[BeaconNodeRole] = AllBeaconNodeRoles
for node in n.items():
res.excl(node.roles)
res
proc init*(t: typedesc[DutyAndProof], epoch: Epoch, dependentRoot: Eth2Digest,
duty: RestAttesterDuty,
slotSig: Opt[ValidatorSig]): DutyAndProof =
DutyAndProof(epoch: epoch, dependentRoot: dependentRoot, data: duty,
slotSig: slotSig)
proc init*(t: typedesc[ProposedData], epoch: Epoch, dependentRoot: Eth2Digest,
data: openArray[ProposerTask]): ProposedData =
ProposedData(epoch: epoch, dependentRoot: dependentRoot, duties: @data)
proc getCurrentSlot*(vc: ValidatorClientRef): Opt[Slot] =
let res = vc.beaconClock.now().toSlot()
if res.afterGenesis:
Opt.some(res.slot)
else:
Opt.none(Slot)
proc getAttesterDutiesForSlot*(vc: ValidatorClientRef,
slot: Slot): seq[DutyAndProof] =
## Returns all `DutyAndProof` for the given `slot`.
var res: seq[DutyAndProof]
let epoch = slot.epoch()
for key, item in vc.attesters:
let duty = item.duties.getOrDefault(epoch, DefaultDutyAndProof)
if not(duty.isDefault()):
if duty.data.slot == slot:
res.add(duty)
res
proc getSyncCommitteeDutiesForSlot*(vc: ValidatorClientRef,
slot: Slot): seq[SyncCommitteeDuty] =
## Returns all `SyncCommitteeDuty` for the given `slot`.
var res: seq[SyncCommitteeDuty]
let epoch = slot.epoch()
for key, item in mpairs(vc.syncCommitteeDuties):
item.duties.withValue(epoch, duty):
res.add(duty[])
res
proc getDurationToNextAttestation*(vc: ValidatorClientRef,
slot: Slot): string =
var minSlot = FAR_FUTURE_SLOT
let currentEpoch = slot.epoch()
for epoch in [currentEpoch, currentEpoch + 1'u64]:
for key, item in vc.attesters:
let duty = item.duties.getOrDefault(epoch, DefaultDutyAndProof)
if not(duty.isDefault()):
let dutySlotTime = duty.data.slot
if (duty.data.slot < minSlot) and (duty.data.slot >= slot):
minSlot = duty.data.slot
if minSlot != FAR_FUTURE_SLOT:
break
if minSlot == FAR_FUTURE_SLOT:
"<unknown>"
else:
$(minSlot.attestation_deadline() - slot.start_beacon_time())
proc getDurationToNextBlock*(vc: ValidatorClientRef, slot: Slot): string =
var minSlot = FAR_FUTURE_SLOT
let currentEpoch = slot.epoch()
for epoch in [currentEpoch, currentEpoch + 1'u64]:
let data = vc.proposers.getOrDefault(epoch)
if not(data.isDefault()):
for item in data.duties:
if item.duty.pubkey in vc.attachedValidators[]:
if (item.duty.slot < minSlot) and (item.duty.slot >= slot):
minSlot = item.duty.slot
if minSlot != FAR_FUTURE_SLOT:
break
if minSlot == FAR_FUTURE_SLOT:
"<unknown>"
else:
$(minSlot.block_deadline() - slot.start_beacon_time())
iterator attesterDutiesForEpoch*(vc: ValidatorClientRef,
epoch: Epoch): DutyAndProof =
for key, item in vc.attesters:
let epochDuties = item.duties.getOrDefault(epoch)
if not(isDefault(epochDuties)):
yield epochDuties
proc syncMembersSubscriptionInfoForEpoch*(
vc: ValidatorClientRef,
epoch: Epoch): seq[SyncCommitteeSubscriptionInfo] =
var res: seq[SyncCommitteeSubscriptionInfo]
for key, item in mpairs(vc.syncCommitteeDuties):
var cur: SyncCommitteeSubscriptionInfo
var initialized = false
item.duties.withValue(epoch, epochDuties):
if not initialized:
cur.validator_index = epochDuties.validator_index
initialized = true
cur.validator_sync_committee_indices.add(
epochDuties.validator_sync_committee_index)
if initialized:
res.add cur
res
proc getDelay*(vc: ValidatorClientRef, deadline: BeaconTime): TimeDiff =
vc.beaconClock.now() - deadline
proc getValidatorForDuties*(vc: ValidatorClientRef,
key: ValidatorPubKey, slot: Slot,
slashingSafe = false): Opt[AttachedValidator] =
vc.attachedValidators[].getValidatorForDuties(key, slot, slashingSafe)
proc forkAtEpoch*(vc: ValidatorClientRef, epoch: Epoch): Fork =
# If schedule is present, it MUST not be empty.
doAssert(len(vc.forks) > 0)
var res: Fork
for item in vc.forks:
if item.epoch <= epoch:
res = item
else:
break
res
proc getSubcommitteeIndex*(index: IndexInSyncCommittee): SyncSubcommitteeIndex =
SyncSubcommitteeIndex(uint16(index) div SYNC_SUBCOMMITTEE_SIZE)
proc currentSlot*(vc: ValidatorClientRef): Slot =
vc.beaconClock.now().slotOrZero()
proc addValidator*(vc: ValidatorClientRef, keystore: KeystoreData) =
let
slot = vc.currentSlot()
withdrawalAddress =
if vc.keymanagerHost.isNil:
Opt.none Eth1Address
else:
vc.keymanagerHost[].getValidatorWithdrawalAddress(keystore.pubkey)
perValidatorDefaultFeeRecipient = getPerValidatorDefaultFeeRecipient(
vc.config.defaultFeeRecipient, withdrawalAddress)
feeRecipient = vc.config.validatorsDir.getSuggestedFeeRecipient(
keystore.pubkey, perValidatorDefaultFeeRecipient).valueOr(
perValidatorDefaultFeeRecipient)
gasLimit = vc.config.validatorsDir.getSuggestedGasLimit(
keystore.pubkey, vc.config.suggestedGasLimit).valueOr(
vc.config.suggestedGasLimit)
discard vc.attachedValidators[].addValidator(keystore, feeRecipient, gasLimit)
proc removeValidator*(vc: ValidatorClientRef,
pubkey: ValidatorPubKey) {.async.} =
let validator = vc.attachedValidators[].getValidator(pubkey).valueOr:
return
# Remove validator from ValidatorPool.
vc.attachedValidators[].removeValidator(pubkey)
case validator.kind
of ValidatorKind.Local:
discard
of ValidatorKind.Remote:
# We must close all the REST clients running for the remote validator.
let pending =
block:
var res: seq[Future[void]]
for item in validator.clients:
res.add(item[0].closeWait())
res
await allFutures(pending)
proc getFeeRecipient*(vc: ValidatorClientRef, pubkey: ValidatorPubKey,
validatorIdx: ValidatorIndex,
epoch: Epoch): Opt[Eth1Address] =
let dynamicRecipient = vc.dynamicFeeRecipientsStore[].getDynamicFeeRecipient(
validatorIdx, epoch)
if dynamicRecipient.isSome():
Opt.some(dynamicRecipient.get())
else:
let
withdrawalAddress =
if vc.keymanagerHost.isNil:
Opt.none Eth1Address
else:
vc.keymanagerHost[].getValidatorWithdrawalAddress(pubkey)
perValidatorDefaultFeeRecipient = getPerValidatorDefaultFeeRecipient(
vc.config.defaultFeeRecipient, withdrawalAddress)
staticRecipient = getSuggestedFeeRecipient(
vc.config.validatorsDir, pubkey, perValidatorDefaultFeeRecipient)
if staticRecipient.isOk():
Opt.some(staticRecipient.get())
else:
Opt.none(Eth1Address)
proc getGasLimit*(vc: ValidatorClientRef,
pubkey: ValidatorPubKey): uint64 =
getSuggestedGasLimit(
vc.config.validatorsDir, pubkey, vc.config.suggestedGasLimit).valueOr:
vc.config.suggestedGasLimit
proc prepareProposersList*(vc: ValidatorClientRef,
epoch: Epoch): seq[PrepareBeaconProposer] =
var res: seq[PrepareBeaconProposer]
for validator in vc.attachedValidators[].items():
if validator.index.isSome():
let
index = validator.index.get()
feeRecipient = vc.getFeeRecipient(validator.pubkey, index, epoch)
if feeRecipient.isSome():
res.add(PrepareBeaconProposer(validator_index: index,
fee_recipient: feeRecipient.get()))
res
proc isDefault*(reg: SignedValidatorRegistrationV1): bool =
(reg.message.timestamp == 0'u64) or (reg.message.gas_limit == 0'u64)
proc isExpired*(vc: ValidatorClientRef,
reg: SignedValidatorRegistrationV1, slot: Slot): bool =
let
regTime = fromUnix(int64(reg.message.timestamp))
regSlot =
block:
let res = vc.beaconClock.toSlot(regTime)
if not(res.afterGenesis):
# This case should not be happend, but it could in case of time jumps
# (time could be modified by admin or ntpd).
return false
uint64(res.slot)
if regSlot > slot:
# This case should not be happened, but if it happens (time could be
# modified by admin or ntpd).
false
else:
if (slot - regSlot) div SLOTS_PER_EPOCH >=
EPOCHS_BETWEEN_VALIDATOR_REGISTRATION:
false
else:
true
proc getValidatorRegistration(
vc: ValidatorClientRef,
validator: AttachedValidator,
timestamp: Time,
fork: Fork
): Result[PendingValidatorRegistration, RegistrationKind] =
if validator.index.isNone():
debug "Validator registration missing validator index",
validator = shortLog(validator)
return err(RegistrationKind.MissingIndex)
let
vindex = validator.index.get()
cached = vc.validatorsRegCache.getOrDefault(validator.pubkey)
currentSlot =
block:
let res = vc.beaconClock.toSlot(timestamp)
if not(res.afterGenesis):
return err(RegistrationKind.IncorrectTime)
res.slot
if cached.isDefault() or vc.isExpired(cached, currentSlot):
let feeRecipient = vc.getFeeRecipient(validator.pubkey, vindex,
currentSlot.epoch())
if feeRecipient.isNone():
debug "Could not get fee recipient for registration data",
validator = shortLog(validator)
return err(RegistrationKind.MissingFee)
let gasLimit = vc.getGasLimit(validator.pubkey)
var registration =
SignedValidatorRegistrationV1(
message: ValidatorRegistrationV1(
fee_recipient:
ExecutionAddress(data: distinctBase(feeRecipient.get())),
gas_limit: gasLimit,
timestamp: uint64(timestamp.toUnix()),
pubkey: validator.pubkey
)
)
let sigfut = validator.getBuilderSignature(fork, registration.message)
if sigfut.finished():
# This is short-path if we able to create signature locally.
if not(sigfut.completed()):
let exc = sigfut.readError()
debug "Got unexpected exception while signing validator registration",
validator = shortLog(validator), error_name = $exc.name,
error_msg = $exc.msg
return err(RegistrationKind.ErrorSignature)
let sigres = sigfut.read()
if sigres.isErr():
debug "Failed to get signature for validator registration",
validator = shortLog(validator), error = sigres.error()
return err(RegistrationKind.NoSignature)
registration.signature = sigres.get()
# Updating cache table with new signed registration data
vc.validatorsRegCache[registration.message.pubkey] = registration
ok(PendingValidatorRegistration(registration: registration, future: nil))
else:
# Remote signature service involved, cache will be updated later.
ok(PendingValidatorRegistration(registration: registration,
future: sigfut))
else:
# Returning cached result.
err(RegistrationKind.Cached)
proc prepareRegistrationList*(
vc: ValidatorClientRef,
timestamp: Time,
fork: Fork
): Future[seq[SignedValidatorRegistrationV1]] {.async.} =
var
messages: seq[SignedValidatorRegistrationV1]
futures: seq[Future[SignatureResult]]
registrations: seq[SignedValidatorRegistrationV1]
total = vc.attachedValidators[].count()
succeed = 0
bad = 0
errors = 0
indexMissing = 0
feeMissing = 0
gasLimit = 0
cached = 0
timed = 0
for validator in vc.attachedValidators[].items():
let res = vc.getValidatorRegistration(validator, timestamp, fork)
if res.isOk():
let preg = res.get()
if preg.future.isNil():
registrations.add(preg.registration)
else:
messages.add(preg.registration)
futures.add(preg.future)
else:
case res.error()
of RegistrationKind.Cached: inc(cached)
of RegistrationKind.IncorrectTime: inc(timed)
of RegistrationKind.NoSignature: inc(bad)
of RegistrationKind.ErrorSignature: inc(errors)
of RegistrationKind.MissingIndex: inc(indexMissing)
of RegistrationKind.MissingFee: inc(feeMissing)
of RegistrationKind.MissingGasLimit: inc(gasLimit)
succeed = len(registrations)
if len(futures) > 0:
await allFutures(futures)
for index, future in futures.pairs():
if future.completed():
let sres = future.read()
if sres.isOk():
var reg = messages[index]
reg.signature = sres.get()
registrations.add(reg)
# Updating cache table
vc.validatorsRegCache[reg.message.pubkey] = reg
inc(succeed)
else:
inc(bad)
else:
inc(errors)
debug "Validator registrations prepared", total = total, succeed = succeed,
cached = cached, bad = bad, errors = errors,
index_missing = indexMissing, fee_missing = feeMissing,
incorrect_time = timed
return registrations
func init*(t: typedesc[ApiNodeFailure], failure: ApiFailure,
request: string, strategy: ApiStrategyKind,
node: BeaconNodeServerRef): ApiNodeFailure =
ApiNodeFailure(node: node, request: request, strategy: Opt.some(strategy),
failure: failure)
func init*(t: typedesc[ApiNodeFailure], failure: ApiFailure,
request: string, strategy: ApiStrategyKind,
node: BeaconNodeServerRef, reason: string): ApiNodeFailure =
ApiNodeFailure(node: node, request: request, strategy: Opt.some(strategy),
failure: failure, reason: reason)
func init*(t: typedesc[ApiNodeFailure], failure: ApiFailure,
request: string, strategy: ApiStrategyKind,
node: BeaconNodeServerRef, status: int,
reason: string): ApiNodeFailure =
ApiNodeFailure(node: node, request: request, strategy: Opt.some(strategy),
failure: failure, status: Opt.some(status), reason: reason)
func init*(t: typedesc[ApiNodeFailure], failure: ApiFailure,
request: string, node: BeaconNodeServerRef, status: int,
reason: string): ApiNodeFailure =
ApiNodeFailure(node: node, request: request,
failure: failure, status: Opt.some(status), reason: reason)
func init*(t: typedesc[ApiNodeFailure], failure: ApiFailure,
request: string, node: BeaconNodeServerRef,
reason: string): ApiNodeFailure =
ApiNodeFailure(node: node, request: request, failure: failure, reason: reason)
proc checkedWaitForSlot*(vc: ValidatorClientRef, destinationSlot: Slot,
offset: TimeDiff,
showLogs: bool): Future[Opt[Slot]] {.async.} =
let
currentTime = vc.beaconClock.now()
currentSlot = currentTime.slotOrZero()
chronosOffset = chronos.nanoseconds(
if offset.nanoseconds < 0: 0'i64 else: offset.nanoseconds)
var timeToSlot = (destinationSlot.start_beacon_time() - currentTime) +
chronosOffset
logScope:
start_time = shortLog(currentTime)
start_slot = shortLog(currentSlot)
dest_slot = shortLog(destinationSlot)
time_to_slot = shortLog(timeToSlot)
while true:
await sleepAsync(timeToSlot)
let
wallTime = vc.beaconClock.now()
wallSlot = wallTime.slotOrZero()
logScope:
wall_time = shortLog(wallTime)
wall_slot = shortLog(wallSlot)
if wallSlot < destinationSlot:
# While we were sleeping, the system clock changed and time moved
# backwards!
if wallSlot + 1 < destinationSlot:
# This is a critical condition where it's hard to reason about what
# to do next - we'll call the attention of the user here by shutting
# down.
if showLogs:
fatal "System time adjusted backwards significantly - " &
"clock may be inaccurate - shutting down"
return Opt.none(Slot)
else:
# Time moved back by a single slot - this could be a minor adjustment,
# for example when NTP does its thing after not working for a while
timeToSlot = destinationSlot.start_beacon_time() - wallTime +
chronosOffset
if showLogs:
warn "System time adjusted backwards, rescheduling slot actions"
continue
elif wallSlot > destinationSlot + SLOTS_PER_EPOCH:
if showLogs:
warn "Time moved forwards by more than an epoch, skipping ahead"
return Opt.some(wallSlot)
elif wallSlot > destinationSlot:
if showLogs:
notice "Missed expected slot start, catching up"
return Opt.some(wallSlot)
else:
return Opt.some(destinationSlot)
proc checkedWaitForNextSlot*(vc: ValidatorClientRef, curSlot: Opt[Slot],
offset: TimeDiff,
showLogs: bool): Future[Opt[Slot]] =
let
currentTime = vc.beaconClock.now()
currentSlot = curSlot.valueOr: currentTime.slotOrZero()
nextSlot = currentSlot + 1
vc.checkedWaitForSlot(nextSlot, offset, showLogs)
proc checkedWaitForNextSlot*(vc: ValidatorClientRef, offset: TimeDiff,
showLogs: bool): Future[Opt[Slot]] =
let
currentTime = vc.beaconClock.now()
currentSlot = currentTime.slotOrZero()
nextSlot = currentSlot + 1
vc.checkedWaitForSlot(nextSlot, offset, showLogs)
proc expectBlock*(vc: ValidatorClientRef, slot: Slot,
confirmations: int = 1): Future[seq[Eth2Digest]] =
var
retFuture = newFuture[seq[Eth2Digest]]("expectBlock")
waiter = BlockWaiter(future: retFuture, count: confirmations)
proc cancellation(udata: pointer) =
vc.blocksSeen.withValue(slot, adata):
adata[].waiters.keepItIf(it.future != retFuture)
proc scheduleCallbacks(data: var BlockDataItem,
waiter: BlockWaiter) =
data.waiters.add(waiter)
for mitem in data.waiters.mitems():
if mitem.count <= len(data.blocks):
if not(mitem.future.finished()): mitem.future.complete(data.blocks)
vc.blocksSeen.mgetOrPut(slot, BlockDataItem()).scheduleCallbacks(waiter)
if not(retFuture.finished()): retFuture.cancelCallback = cancellation
retFuture
proc registerBlock*(vc: ValidatorClientRef, eblck: EventBeaconBlockObject,
node: BeaconNodeServerRef) =
let
wallTime = vc.beaconClock.now()
delay = wallTime - eblck.slot.start_beacon_time()
debug "Block received", slot = eblck.slot,
block_root = shortLog(eblck.block_root), optimistic = eblck.optimistic,
node = node, delay = delay
proc scheduleCallbacks(data: var BlockDataItem,
blck: EventBeaconBlockObject) =
vc.rootsSeen[blck.block_root] = blck.slot
data.blocks.add(blck.block_root)
for mitem in data.waiters.mitems():
if mitem.count >= len(data.blocks):
if not(mitem.future.finished()): mitem.future.complete(data.blocks)
vc.blocksSeen.mgetOrPut(eblck.slot, BlockDataItem()).scheduleCallbacks(eblck)
proc pruneBlocksSeen*(vc: ValidatorClientRef, epoch: Epoch) =
var blocksSeen: Table[Slot, BlockDataItem]
for slot, item in vc.blocksSeen.pairs():
if (slot.epoch() + HISTORICAL_DUTIES_EPOCHS) >= epoch:
blocksSeen[slot] = item
else:
for root in item.blocks: vc.rootsSeen.del(root)
let blockRoot =
if len(item.blocks) == 0:
"<missing>"
elif len(item.blocks) == 1:
shortLog(item.blocks[0])
else:
"[" & item.blocks.mapIt(shortLog(it)).join(", ") & "]"
debug "Block data has been pruned", slot = slot, blocks = blockRoot
vc.blocksSeen = blocksSeen
proc waitForBlock*(
vc: ValidatorClientRef,
slot: Slot,
timediff: TimeDiff,
confirmations: int = 1
) {.async.} =
## This procedure will wait for a block proposal for a ``slot`` received
## by the beacon node.
let
startTime = Moment.now()
waitTime = (start_beacon_time(slot) + timediff) - vc.beaconClock.now()
logScope:
slot = slot
timediff = timediff
wait_time = waitTime
debug "Waiting for block proposal"
if waitTime.nanoseconds <= 0'i64:
# We do not have time to wait for block.
return
let blocks =
try:
let timeout = nanoseconds(waitTime.nanoseconds)
await vc.expectBlock(slot, confirmations).wait(timeout)
except AsyncTimeoutError:
let dur = Moment.now() - startTime
debug "Block has not been received in time", duration = dur
return
except CancelledError as exc:
let dur = Moment.now() - startTime
debug "Block awaiting was interrupted", duration = dur
raise exc
except CatchableError as exc:
let dur = Moment.now() - startTime
error "Unexpected error occured while waiting for block publication",
err_name = exc.name, err_msg = exc.msg, duration = dur
return
let
dur = Moment.now() - startTime
blockRoot =
if len(blocks) == 0:
"<missing>"
elif len(blocks) == 1:
shortLog(blocks[0])
else:
"[" & blocks.mapIt(shortLog(it)).join(", ") & "]"
debug "Block proposal awaited", duration = dur,
block_root = blockRoot
try:
await waitAfterBlockCutoff(vc.beaconClock, slot)
except CancelledError as exc:
let dur = Moment.now() - startTime
debug "Waiting for block cutoff was interrupted", duration = dur
raise exc
iterator chunks*[T](data: openArray[T], maxCount: Positive): seq[T] =
for i in countup(0, len(data) - 1, maxCount):
yield @(data.toOpenArray(i, min(i + maxCount, len(data)) - 1))
func init*(t: typedesc[TimeOffset], duration: Duration): TimeOffset =
TimeOffset(value: duration.nanoseconds)
func init*(t: typedesc[TimeOffset], offset: int64): TimeOffset =
TimeOffset(value: offset)
func abs*(to: TimeOffset): TimeOffset =
TimeOffset(value: abs(to.value))
func milliseconds*(to: TimeOffset): int64 =
if to.value < 0:
-nanoseconds(-to.value).milliseconds
else:
nanoseconds(-to.value).milliseconds
func `<`*(a, b: TimeOffset): bool = a.value < b.value
func `<=`*(a, b: TimeOffset): bool = a.value <= b.value
func `==`*(a, b: TimeOffset): bool = a.value == b.value
func nanoseconds*(to: TimeOffset): int64 = to.value
proc waitForNextEpoch*(service: ClientServiceRef,
delay: Duration) {.async.} =
let
vc = service.client
sleepTime = vc.beaconClock.durationToNextEpoch() + delay
debug "Sleeping until next epoch", service = service.name,
sleep_time = sleepTime, delay = delay
await sleepAsync(sleepTime)
proc waitForNextEpoch*(service: ClientServiceRef): Future[void] =
waitForNextEpoch(service, ZeroDuration)
proc waitForNextSlot*(service: ClientServiceRef) {.async.} =
let vc = service.client
let sleepTime = vc.beaconClock.durationToNextSlot()
await sleepAsync(sleepTime)