VC: Fix forks handling. (#3389)
* Trying to debug the finalization issue. * Add debug logs to understand signature issue. * Remove all the debugging helpers. * Initial commit. * Address review comments. * Remove unneeded checks for empty fork schedule. * Fix bellatrix ExecutionAddress serialization/deserialization procedures.
This commit is contained in:
parent
254e0fe2e2
commit
3a80b9951c
|
@ -194,6 +194,7 @@ programMain:
|
|||
beaconNodes: beaconNodes,
|
||||
graffitiBytes: config.graffiti.get(defaultGraffitiBytes()),
|
||||
nodesAvailable: newAsyncEvent(),
|
||||
forksAvailable: newAsyncEvent()
|
||||
)
|
||||
|
||||
waitFor asyncInit(vc)
|
||||
|
|
|
@ -327,20 +327,25 @@ type
|
|||
func encodeQuantityHex*(x: auto): string =
|
||||
"0x" & x.toHex
|
||||
|
||||
proc fromHex*(T: typedesc[BloomLogs], s: string): T {.raises: [Defect, ValueError].} =
|
||||
proc fromHex*(T: typedesc[BloomLogs], s: string): T {.
|
||||
raises: [Defect, ValueError].} =
|
||||
hexToByteArray(s, result.data)
|
||||
|
||||
proc fromHex*(T: typedesc[ExecutionAddress], s: string): T {.raises: [Defect, ValueError].} =
|
||||
proc fromHex*(T: typedesc[ExecutionAddress], s: string): T {.
|
||||
raises: [Defect, ValueError].} =
|
||||
hexToByteArray(s, result.data)
|
||||
|
||||
proc writeValue*(w: var JsonWriter, a: ExecutionAddress) {.raises: [Defect, IOError].} =
|
||||
w.writeValue $a
|
||||
proc writeValue*(writer: var JsonWriter, value: ExecutionAddress) {.
|
||||
raises: [Defect, IOError].} =
|
||||
writer.writeValue to0xHex(value.data)
|
||||
|
||||
proc readValue*(r: var JsonReader, a: var ExecutionAddress) {.raises: [Defect, IOError, SerializationError].} =
|
||||
proc readValue*(reader: var JsonReader, value: var ExecutionAddress) {.
|
||||
raises: [Defect, IOError, SerializationError].} =
|
||||
try:
|
||||
a = fromHex(type(a), r.readValue(string))
|
||||
hexToByteArray(reader.readValue(string), value.data)
|
||||
except ValueError:
|
||||
raiseUnexpectedValue(r, "Hex string expected")
|
||||
raiseUnexpectedValue(reader,
|
||||
"ExecutionAddress value should be a valid hex string")
|
||||
|
||||
func shortLog*(v: SomeBeaconBlock): auto =
|
||||
(
|
||||
|
|
|
@ -19,8 +19,7 @@ proc serveAttestation(service: AttestationServiceRef, adata: AttestationData,
|
|||
if res.isNone():
|
||||
return false
|
||||
res.get()
|
||||
|
||||
let fork = vc.fork.get()
|
||||
let fork = vc.forkAtEpoch(adata.slot.epoch)
|
||||
|
||||
# TODO: signing_root is recomputed in signBlockProposal just after,
|
||||
# but not for locally attached validators.
|
||||
|
@ -96,7 +95,11 @@ proc serveAggregateAndProof*(service: AttestationServiceRef,
|
|||
let
|
||||
vc = service.client
|
||||
genesisRoot = vc.beaconGenesis.genesis_validators_root
|
||||
fork = vc.fork.get()
|
||||
slot = proof.aggregate.data.slot
|
||||
fork = vc.forkAtEpoch(slot.epoch)
|
||||
|
||||
debug "Signing aggregate", validator = shortLog(validator),
|
||||
attestation = shortLog(proof.aggregate), fork = fork
|
||||
|
||||
let signature =
|
||||
block:
|
||||
|
@ -112,10 +115,9 @@ proc serveAggregateAndProof*(service: AttestationServiceRef,
|
|||
let signedProof = SignedAggregateAndProof(message: proof,
|
||||
signature: signature)
|
||||
|
||||
let slot = proof.aggregate.data.slot
|
||||
let vindex = validator.index.get()
|
||||
|
||||
debug "Sending aggregated attestation",
|
||||
debug "Sending aggregated attestation", fork = fork,
|
||||
attestation = shortLog(signedProof.message.aggregate),
|
||||
validator = shortLog(validator), validator_index = vindex,
|
||||
delay = vc.getDelay(slot.aggregate_deadline())
|
||||
|
|
|
@ -13,7 +13,7 @@ proc publishBlock(vc: ValidatorClientRef, currentSlot, slot: Slot,
|
|||
vc.config.graffiti.get()
|
||||
else:
|
||||
defaultGraffitiBytes()
|
||||
fork = vc.fork.get()
|
||||
fork = vc.forkAtEpoch(slot.epoch)
|
||||
|
||||
debug "Publishing block", validator = shortLog(validator),
|
||||
delay = vc.getDelay(slot.block_deadline()),
|
||||
|
|
|
@ -99,7 +99,8 @@ type
|
|||
runSlotLoop*: Future[void]
|
||||
beaconClock*: BeaconClock
|
||||
attachedValidators*: ValidatorPool
|
||||
fork*: Option[Fork]
|
||||
forks*: seq[Fork]
|
||||
forksAvailable*: AsyncEvent
|
||||
attesters*: AttesterMap
|
||||
proposers*: ProposerMap
|
||||
beaconGenesis*: RestGenesis
|
||||
|
@ -241,3 +242,14 @@ proc getValidator*(vc: ValidatorClientRef,
|
|||
none[AttachedValidator]()
|
||||
else:
|
||||
some(validator)
|
||||
|
||||
proc forkAtEpoch*(vc: ValidatorClientRef, epoch: Epoch): Fork =
|
||||
# If schedule is present, it MUST not be empty.
|
||||
doAssert(len(vc.forks) > 0)
|
||||
var res: Fork
|
||||
for item in vc.forks:
|
||||
if item.epoch <= epoch:
|
||||
res = item
|
||||
else:
|
||||
break
|
||||
res
|
||||
|
|
|
@ -126,7 +126,6 @@ proc pollForAttesterDuties*(vc: ValidatorClientRef,
|
|||
checkDuty(it) and (it.pubkey in vc.attachedValidators)
|
||||
)
|
||||
dependentRoot = currentRoot.get()
|
||||
fork = vc.fork.get()
|
||||
genesisRoot = vc.beaconGenesis.genesis_validators_root
|
||||
|
||||
let addOrReplaceItems =
|
||||
|
@ -155,6 +154,7 @@ proc pollForAttesterDuties*(vc: ValidatorClientRef,
|
|||
var validators: seq[AttachedValidator]
|
||||
for item in addOrReplaceItems:
|
||||
let validator = vc.attachedValidators.getValidator(item.duty.pubkey)
|
||||
let fork = vc.forkAtEpoch(item.duty.slot.epoch)
|
||||
let future = validator.getSlotSig(fork, genesisRoot, item.duty.slot)
|
||||
pending.add(future)
|
||||
validators.add(validator)
|
||||
|
@ -299,12 +299,20 @@ proc waitForNextSlot(service: DutiesServiceRef,
|
|||
|
||||
proc attesterDutiesLoop(service: DutiesServiceRef) {.async.} =
|
||||
let vc = service.client
|
||||
|
||||
debug "Attester duties loop waiting for fork schedule update"
|
||||
await vc.forksAvailable.wait()
|
||||
doAssert(len(vc.forks) > 0, "Fork schedule must not be empty at this point")
|
||||
while true:
|
||||
await vc.pollForAttesterDuties()
|
||||
await service.waitForNextSlot(AttesterLoop)
|
||||
|
||||
proc proposerDutiesLoop(service: DutiesServiceRef) {.async.} =
|
||||
let vc = service.client
|
||||
|
||||
debug "Proposer duties loop waiting for fork schedule update"
|
||||
await vc.forksAvailable.wait()
|
||||
doAssert(len(vc.forks) > 0, "Fork schedule must not be empty at this point")
|
||||
while true:
|
||||
await vc.pollForBeaconProposers()
|
||||
await service.waitForNextSlot(ProposerLoop)
|
||||
|
|
|
@ -17,24 +17,20 @@ proc validateForkSchedule(forks: openarray[Fork]): bool {.raises: [Defect].} =
|
|||
current_version = item.current_version
|
||||
true
|
||||
|
||||
proc getCurrentFork(forks: openarray[Fork],
|
||||
epoch: Epoch): Result[Fork, cstring] {.raises: [Defect].} =
|
||||
proc sortForks(forks: openarray[Fork]): Result[seq[Fork], cstring] {.
|
||||
raises: [Defect].} =
|
||||
proc cmp(x, y: Fork): int {.closure.} =
|
||||
if uint64(x.epoch) == uint64(y.epoch): return 0
|
||||
if uint64(x.epoch) < uint64(y.epoch): return -1
|
||||
return 1
|
||||
|
||||
let sortedForks = sorted(forks, cmp)
|
||||
if len(sortedForks) == 0:
|
||||
if len(forks) == 0:
|
||||
return err("Empty fork schedule")
|
||||
|
||||
let sortedForks = sorted(forks, cmp)
|
||||
if not(validateForkSchedule(sortedForks)):
|
||||
return err("Invalid fork schedule")
|
||||
var res: Fork
|
||||
for item in sortedForks:
|
||||
res = item
|
||||
if item.epoch > epoch:
|
||||
break
|
||||
ok(res)
|
||||
ok(sortedForks)
|
||||
|
||||
proc pollForFork(vc: ValidatorClientRef) {.async.} =
|
||||
let sres = vc.getCurrentSlot()
|
||||
|
@ -54,17 +50,18 @@ proc pollForFork(vc: ValidatorClientRef) {.async.} =
|
|||
err_name = exc.name, err_msg = exc.msg
|
||||
return
|
||||
|
||||
let fork =
|
||||
let sortedForks =
|
||||
block:
|
||||
let res = getCurrentFork(forks, currentEpoch)
|
||||
let res = sortForks(forks)
|
||||
if res.isErr():
|
||||
error "Invalid fork schedule received", reason = res.error()
|
||||
return
|
||||
res.get()
|
||||
|
||||
if vc.fork.isNone() or (vc.fork.get() != fork):
|
||||
vc.fork = some(fork)
|
||||
notice "Fork update succeeded", fork = fork
|
||||
if (len(vc.forks) == 0) or (vc.forks != sortedForks):
|
||||
vc.forks = sortedForks
|
||||
notice "Fork schedule updated", fork_schedule = sortedForks
|
||||
vc.forksAvailable.fire()
|
||||
|
||||
proc waitForNextEpoch(service: ForkServiceRef) {.async.} =
|
||||
let vc = service.client
|
||||
|
|
Loading…
Reference in New Issue