raises for gossip (#5808)

* raises for gossip

* fix light client
This commit is contained in:
Jacek Sieka 2024-01-22 17:34:54 +01:00 committed by GitHub
parent ee798af1dd
commit 6328c77778
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
16 changed files with 124 additions and 112 deletions

View File

@ -39,7 +39,7 @@ proc initLightClient*(
let
optimisticHandler = proc(signedBlock: ForkedMsgTrustedSignedBeaconBlock):
Future[void] {.async.} =
Future[void] {.async: (raises: [CancelledError]).} =
debug "New LC optimistic block",
opt = signedBlock.toBlockId(),
dag = node.dag.head.bid,

View File

@ -28,7 +28,7 @@ from ../validators/action_tracker import ActionTracker, getNextProposalSlot
type
ConsensusManager* = object
expectedSlot: Slot
expectedBlockReceived: Future[bool]
expectedBlockReceived: Future[bool].Raising([CancelledError])
# Validated & Verified
# ----------------------------------------------------------------
@ -97,7 +97,8 @@ proc checkExpectedBlock(self: var ConsensusManager) =
self.expectedBlockReceived.complete(true)
self.expectedBlockReceived = nil # Don't keep completed futures around!
proc expectBlock*(self: var ConsensusManager, expectedSlot: Slot): Future[bool] =
proc expectBlock*(self: var ConsensusManager, expectedSlot: Slot): Future[bool]
{.async: (raises: [CancelledError], raw: true).} =
## Return a future that will complete when a head is selected whose slot is
## equal or greater than the given slot, or a new expectation is created
if self.expectedBlockReceived != nil:
@ -153,7 +154,7 @@ func setOptimisticHead*(
self.optimisticHead = (bid: bid, execution_block_hash: execution_block_hash)
proc updateExecutionClientHead(self: ref ConsensusManager,
newHead: BeaconHead): Future[Opt[void]] {.async.} =
newHead: BeaconHead): Future[Opt[void]] {.async: (raises: [CancelledError]).} =
let headExecutionPayloadHash = self.dag.loadExecutionBlockHash(newHead.blck)
if headExecutionPayloadHash.isZero:
@ -318,7 +319,7 @@ proc getGasLimit*(self: ConsensusManager, pubkey: ValidatorPubKey): uint64 =
from ../spec/datatypes/bellatrix import PayloadID
proc runProposalForkchoiceUpdated*(
self: ref ConsensusManager, wallSlot: Slot): Future[Opt[void]] {.async.} =
self: ref ConsensusManager, wallSlot: Slot): Future[Opt[void]] {.async: (raises: [CancelledError]).} =
let
nextWallSlot = wallSlot + 1
(validatorIndex, nextProposer) = self.checkNextProposer(wallSlot).valueOr:
@ -351,46 +352,43 @@ proc runProposalForkchoiceUpdated*(
if headBlockHash.isZero:
return err()
try:
let safeBlockHash = beaconHead.safeExecutionPayloadHash
let safeBlockHash = beaconHead.safeExecutionPayloadHash
withState(self.dag.headState):
template callForkchoiceUpdated(fcPayloadAttributes: auto) =
let (status, _) = await self.elManager.forkchoiceUpdated(
headBlockHash, safeBlockHash,
beaconHead.finalizedExecutionPayloadHash,
payloadAttributes = some fcPayloadAttributes)
debug "Fork-choice updated for proposal", status
withState(self.dag.headState):
template callForkchoiceUpdated(fcPayloadAttributes: auto) =
let (status, _) = await self.elManager.forkchoiceUpdated(
headBlockHash, safeBlockHash,
beaconHead.finalizedExecutionPayloadHash,
payloadAttributes = some fcPayloadAttributes)
debug "Fork-choice updated for proposal", status
static: doAssert high(ConsensusFork) == ConsensusFork.Deneb
when consensusFork >= ConsensusFork.Deneb:
callForkchoiceUpdated(PayloadAttributesV3(
timestamp: Quantity timestamp,
prevRandao: FixedBytes[32] randomData,
suggestedFeeRecipient: feeRecipient,
withdrawals:
toEngineWithdrawals get_expected_withdrawals(forkyState.data),
parentBeaconBlockRoot: beaconHead.blck.bid.root.asBlockHash))
elif consensusFork >= ConsensusFork.Capella:
callForkchoiceUpdated(PayloadAttributesV2(
timestamp: Quantity timestamp,
prevRandao: FixedBytes[32] randomData,
suggestedFeeRecipient: feeRecipient,
withdrawals:
toEngineWithdrawals get_expected_withdrawals(forkyState.data)))
else:
callForkchoiceUpdated(PayloadAttributesV1(
timestamp: Quantity timestamp,
prevRandao: FixedBytes[32] randomData,
suggestedFeeRecipient: feeRecipient))
except CatchableError as err:
error "Engine API fork-choice update failed", err = err.msg
static: doAssert high(ConsensusFork) == ConsensusFork.Deneb
when consensusFork >= ConsensusFork.Deneb:
callForkchoiceUpdated(PayloadAttributesV3(
timestamp: Quantity timestamp,
prevRandao: FixedBytes[32] randomData,
suggestedFeeRecipient: feeRecipient,
withdrawals:
toEngineWithdrawals get_expected_withdrawals(forkyState.data),
parentBeaconBlockRoot: beaconHead.blck.bid.root.asBlockHash))
elif consensusFork >= ConsensusFork.Capella:
callForkchoiceUpdated(PayloadAttributesV2(
timestamp: Quantity timestamp,
prevRandao: FixedBytes[32] randomData,
suggestedFeeRecipient: feeRecipient,
withdrawals:
toEngineWithdrawals get_expected_withdrawals(forkyState.data)))
else:
callForkchoiceUpdated(PayloadAttributesV1(
timestamp: Quantity timestamp,
prevRandao: FixedBytes[32] randomData,
suggestedFeeRecipient: feeRecipient))
ok()
proc updateHeadWithExecution*(
self: ref ConsensusManager, initialNewHead: BeaconHead,
getBeaconTimeFn: GetBeaconTimeFn) {.async.} =
getBeaconTimeFn: GetBeaconTimeFn) {.async: (raises: [CancelledError]).} =
## Trigger fork choice and update the DAG with the new head block
## This does not automatically prune the DAG after finalization
## `pruneFinalized` must be called for pruning.

View File

@ -1117,7 +1117,7 @@ proc forkchoiceUpdated*(m: ELManager,
payloadAttributes: Option[PayloadAttributesV1] |
Option[PayloadAttributesV2] |
Option[PayloadAttributesV3]):
Future[(PayloadExecutionStatus, Option[BlockHash])] {.async.} =
Future[(PayloadExecutionStatus, Option[BlockHash])] {.async: (raises: [CancelledError]).} =
doAssert not headBlockHash.isZero
# Allow finalizedBlockHash to be 0 to avoid sync deadlocks.
@ -1208,12 +1208,20 @@ proc forkchoiceUpdated*(m: ELManager,
finalizedBlockHash: finalizedBlockHash,
payloadAttributes: payloadAttributesV3))
template getSelected: untyped =
let
data =
try:
requests[responseProcessor.selectedResponse.get].read
except CatchableError:
raiseAssert "Only completed requests get selected"
(data.status, data.latestValidHash)
if responseProcessor.disagreementAlreadyDetected:
return (PayloadExecutionStatus.invalid, none BlockHash)
elif responseProcessor.selectedResponse.isSome:
assignNextExpectedPayloadParams()
return (requests[responseProcessor.selectedResponse.get].read.status,
requests[responseProcessor.selectedResponse.get].read.latestValidHash)
return getSelected()
await requestsCompleted or deadline
@ -1225,8 +1233,7 @@ proc forkchoiceUpdated*(m: ELManager,
(PayloadExecutionStatus.invalid, none BlockHash)
elif responseProcessor.selectedResponse.isSome:
assignNextExpectedPayloadParams()
(requests[responseProcessor.selectedResponse.get].read.status,
requests[responseProcessor.selectedResponse.get].read.latestValidHash)
getSelected()
else:
(PayloadExecutionStatus.syncing, none BlockHash)

View File

@ -85,6 +85,8 @@ type
Valid
Timeout
FutureBatchResult = Future[BatchResult].Raising([CancelledError])
Eager = proc(): bool {.gcsafe, raises: [].}
## Callback that returns true if eager processing should be done to lower
## latency at the expense of spending more cycles validating things,
@ -92,7 +94,7 @@ type
BatchItem* = object
sigset: SignatureSet
fut: Future[BatchResult]
fut: FutureBatchResult
Batch* = object
## A batch represents up to BatchedCryptoSize non-aggregated signatures
@ -103,7 +105,7 @@ type
VerifierItem = object
verifier: ref BatchVerifier
signal: ThreadSignalPtr
inflight: Future[void]
inflight: Future[void].Raising([CancelledError])
BatchCrypto* = object
batches: Deque[ref Batch]
@ -129,7 +131,7 @@ type
# Most scheduled checks require this immutable value, so don't require it
# to be provided separately each time
processor: Future[void]
processor: Future[void].Raising([CancelledError])
BatchTask = object
ok: Atomic[bool]
@ -237,7 +239,7 @@ proc spawnBatchVerifyTask(tp: Taskpool, task: ptr BatchTask) =
proc batchVerifyAsync*(
verifier: ref BatchVerifier, signal: ThreadSignalPtr,
batch: ref Batch): Future[bool] {.async.} =
batch: ref Batch): Future[bool] {.async: (raises: [CancelledError]).} =
var task = BatchTask(
setsPtr: makeUncheckedArray(baseAddr batch[].sigsets),
numSets: batch[].sigsets.len,
@ -253,12 +255,17 @@ proc batchVerifyAsync*(
doAssert verifier[].taskpool.numThreads > 1,
"Must have at least one separate thread or signal will never be fired"
verifier[].taskpool.spawnBatchVerifyTask(taskPtr)
await signal.wait()
try:
await signal.wait()
except AsyncError as exc:
warn "Batch verification verification failed - report bug", err = exc.msg
return false
task.ok.load()
proc processBatch(
batchCrypto: ref BatchCrypto, batch: ref Batch,
verifier: ref BatchVerifier, signal: ThreadSignalPtr) {.async.} =
verifier: ref BatchVerifier, signal: ThreadSignalPtr) {.async: (raises: [CancelledError]).} =
let
numSets = batch[].sigsets.len()
@ -307,7 +314,7 @@ proc processBatch(
batchCrypto[].complete(batch[], ok)
proc processLoop(batchCrypto: ref BatchCrypto) {.async.} =
proc processLoop(batchCrypto: ref BatchCrypto) {.async: (raises: [CancelledError]).} =
## Process pending crypto check after some time has passed - the time is
## chosen such that there's time to fill the batch but not so long that
## latency across the network is negatively affected
@ -354,7 +361,7 @@ proc scheduleProcessor(batchCrypto: ref BatchCrypto) =
proc verifySoon(
batchCrypto: ref BatchCrypto, name: static string,
sigset: SignatureSet): Future[BatchResult] =
sigset: SignatureSet): Future[BatchResult]{.async: (raises: [CancelledError], raw: true).} =
let
batch = batchCrypto[].getBatch()
fut = newFuture[BatchResult](name)
@ -385,7 +392,7 @@ proc scheduleAttestationCheck*(
batchCrypto: ref BatchCrypto, fork: Fork,
attestationData: AttestationData, pubkey: CookedPubKey,
signature: ValidatorSig
): Result[tuple[fut: Future[BatchResult], sig: CookedSig], cstring] =
): Result[tuple[fut: FutureBatchResult, sig: CookedSig], cstring] =
## Schedule crypto verification of an attestation
##
## The buffer is processed:
@ -410,7 +417,7 @@ proc scheduleAggregateChecks*(
signedAggregateAndProof: SignedAggregateAndProof, dag: ChainDAGRef,
attesting_indices: openArray[ValidatorIndex]
): Result[tuple[
aggregatorFut, slotFut, aggregateFut: Future[BatchResult],
aggregatorFut, slotFut, aggregateFut: FutureBatchResult,
sig: CookedSig], cstring] =
## Schedule crypto verification of an aggregate
##
@ -462,7 +469,7 @@ proc scheduleSyncCommitteeMessageCheck*(
batchCrypto: ref BatchCrypto, fork: Fork, slot: Slot,
beacon_block_root: Eth2Digest, pubkey: CookedPubKey,
signature: ValidatorSig
): Result[tuple[fut: Future[BatchResult], sig: CookedSig], cstring] =
): Result[tuple[fut: FutureBatchResult, sig: CookedSig], cstring] =
## Schedule crypto verification of an attestation
##
## The buffer is processed:
@ -486,7 +493,7 @@ proc scheduleContributionChecks*(
batchCrypto: ref BatchCrypto,
fork: Fork, signedContributionAndProof: SignedContributionAndProof,
subcommitteeIdx: SyncSubcommitteeIndex, dag: ChainDAGRef): Result[tuple[
aggregatorFut, proofFut, contributionFut: Future[BatchResult],
aggregatorFut, proofFut, contributionFut: FutureBatchResult,
sig: CookedSig], cstring] =
## Schedule crypto verification of all signatures in a
## SignedContributionAndProof message
@ -535,7 +542,7 @@ proc scheduleContributionChecks*(
proc scheduleBlsToExecutionChangeCheck*(
batchCrypto: ref BatchCrypto,
genesis_fork: Fork, signedBLSToExecutionChange: SignedBLSToExecutionChange):
Result[tuple[fut: Future[BatchResult], sig: CookedSig], cstring] =
Result[tuple[fut: FutureBatchResult, sig: CookedSig], cstring] =
## Schedule crypto verification of all signatures in a
## SignedBLSToExecutionChange message
##

View File

@ -238,7 +238,7 @@ from ../el/el_manager import
proc expectValidForkchoiceUpdated(
elManager: ELManager, headBlockPayloadAttributesType: typedesc,
headBlockHash, safeBlockHash, finalizedBlockHash: Eth2Digest,
receivedBlock: ForkySignedBeaconBlock): Future[void] {.async.} =
receivedBlock: ForkySignedBeaconBlock): Future[void] {.async: (raises: [CancelledError]).} =
let
(payloadExecutionStatus, _) = await elManager.forkchoiceUpdated(
headBlockHash = headBlockHash,
@ -291,7 +291,7 @@ from ../spec/datatypes/deneb import SignedBeaconBlock, asTrusted, shortLog
proc newExecutionPayload*(
elManager: ELManager, blck: SomeForkyBeaconBlock):
Future[Opt[PayloadExecutionStatus]] {.async.} =
Future[Opt[PayloadExecutionStatus]] {.async: (raises: [CancelledError]).} =
template executionPayload: untyped = blck.body.execution_payload
@ -329,7 +329,7 @@ proc getExecutionValidity(
elManager: ELManager,
blck: bellatrix.SignedBeaconBlock | capella.SignedBeaconBlock |
deneb.SignedBeaconBlock):
Future[NewPayloadStatus] {.async.} =
Future[NewPayloadStatus] {.async: (raises: [CancelledError]).} =
if not blck.message.is_execution_block:
return NewPayloadStatus.valid # vacuously
@ -413,7 +413,7 @@ proc storeBlock(
blobsOpt: Opt[BlobSidecars],
maybeFinalized = false,
queueTick: Moment = Moment.now(), validationDur = Duration()):
Future[Result[BlockRef, (VerifierError, ProcessingStatus)]] {.async.} =
Future[Result[BlockRef, (VerifierError, ProcessingStatus)]] {.async: (raises: [CancelledError]).} =
## storeBlock is the main entry point for unvalidated blocks - all untrusted
## blocks, regardless of origin, pass through here. When storing a block,
## we will add it to the dag and pass it to all block consumers that need
@ -774,7 +774,7 @@ proc addBlock*(
# ------------------------------------------------------------------------------
proc processBlock(
self: ref BlockProcessor, entry: BlockEntry) {.async.} =
self: ref BlockProcessor, entry: BlockEntry) {.async: (raises: [CancelledError]).} =
logScope:
blockRoot = shortLog(entry.blck.root)

View File

@ -367,7 +367,7 @@ proc checkForPotentialDoppelganger(
proc processAttestation*(
self: ref Eth2Processor, src: MsgSource,
attestation: Attestation, subnet_id: SubnetId,
checkSignature: bool = true): Future[ValidationRes] {.async.} =
checkSignature: bool = true): Future[ValidationRes] {.async: (raises: [CancelledError]).} =
var wallTime = self.getCurrentBeaconTime()
let (afterGenesis, wallSlot) = wallTime.toSlot()
@ -415,7 +415,7 @@ proc processAttestation*(
proc processSignedAggregateAndProof*(
self: ref Eth2Processor, src: MsgSource,
signedAggregateAndProof: SignedAggregateAndProof,
checkSignature = true, checkCover = true): Future[ValidationRes] {.async.} =
checkSignature = true, checkCover = true): Future[ValidationRes] {.async: (raises: [CancelledError]).} =
var wallTime = self.getCurrentBeaconTime()
let (afterGenesis, wallSlot) = wallTime.toSlot()
@ -472,7 +472,7 @@ proc processSignedAggregateAndProof*(
proc processBlsToExecutionChange*(
self: ref Eth2Processor, src: MsgSource,
blsToExecutionChange: SignedBLSToExecutionChange):
Future[ValidationRes] {.async.} =
Future[ValidationRes] {.async: (raises: [CancelledError]).} =
logScope:
blsToExecutionChange = shortLog(blsToExecutionChange)
@ -568,7 +568,7 @@ proc processSyncCommitteeMessage*(
self: ref Eth2Processor, src: MsgSource,
syncCommitteeMsg: SyncCommitteeMessage,
subcommitteeIdx: SyncSubcommitteeIndex,
checkSignature: bool = true): Future[Result[void, ValidationError]] {.async.} =
checkSignature: bool = true): Future[Result[void, ValidationError]] {.async: (raises: [CancelledError]).} =
let
wallTime = self.getCurrentBeaconTime()
wallSlot = wallTime.slotOrZero()
@ -612,7 +612,7 @@ proc processSyncCommitteeMessage*(
proc processSignedContributionAndProof*(
self: ref Eth2Processor, src: MsgSource,
contributionAndProof: SignedContributionAndProof,
checkSignature: bool = true): Future[Result[void, ValidationError]] {.async.} =
checkSignature: bool = true): Future[Result[void, ValidationError]] {.async: (raises: [CancelledError]).} =
let
wallTime = self.getCurrentBeaconTime()
wallSlot = wallTime.slotOrZero()

View File

@ -625,7 +625,7 @@ proc validateAttestation*(
subnet_id: SubnetId, checkSignature: bool):
Future[Result[
tuple[attesting_index: ValidatorIndex, sig: CookedSig],
ValidationError]] {.async.} =
ValidationError]] {.async: (raises: [CancelledError]).} =
# Some of the checks below have been reordered compared to the spec, to
# perform the cheap checks first - in particular, we want to avoid loading
# an `EpochRef` and checking signatures. This reordering might lead to
@ -796,7 +796,7 @@ proc validateAggregate*(
checkSignature = true, checkCover = true):
Future[Result[
tuple[attestingIndices: seq[ValidatorIndex], sig: CookedSig],
ValidationError]] {.async.} =
ValidationError]] {.async: (raises: [CancelledError]).} =
# Some of the checks below have been reordered compared to the spec, to
# perform the cheap checks first - in particular, we want to avoid loading
# an `EpochRef` and checking signatures. This reordering might lead to
@ -1004,7 +1004,7 @@ proc validateAggregate*(
proc validateBlsToExecutionChange*(
pool: ValidatorChangePool, batchCrypto: ref BatchCrypto,
signed_address_change: SignedBLSToExecutionChange,
wallEpoch: Epoch): Future[Result[void, ValidationError]] {.async.} =
wallEpoch: Epoch): Future[Result[void, ValidationError]] {.async: (raises: [CancelledError]).} =
# [IGNORE] `current_epoch >= CAPELLA_FORK_EPOCH`, where `current_epoch` is
# defined by the current wall-clock time.
if not (wallEpoch >= pool.dag.cfg.CAPELLA_FORK_EPOCH):
@ -1149,7 +1149,7 @@ proc validateSyncCommitteeMessage*(
wallTime: BeaconTime,
checkSignature: bool):
Future[Result[
(BlockId, CookedSig, seq[uint64]), ValidationError]] {.async.} =
(BlockId, CookedSig, seq[uint64]), ValidationError]] {.async: (raises: [CancelledError]).} =
block:
# [IGNORE] The message's slot is for the current slot (with a
# `MAXIMUM_GOSSIP_CLOCK_DISPARITY` allowance), i.e.
@ -1241,7 +1241,7 @@ proc validateContribution*(
wallTime: BeaconTime,
checkSignature: bool
): Future[Result[
(BlockId, CookedSig, seq[ValidatorIndex]), ValidationError]] {.async.} =
(BlockId, CookedSig, seq[ValidatorIndex]), ValidationError]] {.async: (raises: [CancelledError]).} =
block:
# [IGNORE] The contribution's slot is for the current slot
# (with a MAXIMUM_GOSSIP_CLOCK_DISPARITY allowance)

View File

@ -439,7 +439,7 @@ proc addObject*(
self: var LightClientProcessor,
src: MsgSource,
obj: SomeForkedLightClientObject,
resfut: Future[Result[void, VerifierError]] = nil) =
resfut: Future[Result[void, VerifierError]].Raising([CancelledError]) = nil) =
## Enqueue a Gossip-validated light client object for verification
# Backpressure:
# Only one object is validated at any time -

View File

@ -30,14 +30,14 @@ const
type
MsgTrustedBlockProcessor* =
proc(signedBlock: ForkedMsgTrustedSignedBeaconBlock): Future[void] {.
gcsafe, raises: [].}
async: (raises: [CancelledError]).}
OptimisticProcessor* = ref object
getBeaconTime: GetBeaconTimeFn
optimisticVerifier: MsgTrustedBlockProcessor
blocks: Table[Eth2Digest, ref ForkedSignedBeaconBlock]
latestOptimisticSlot: Slot
processFut: Future[void]
processFut: Future[void].Raising([CancelledError])
logMoment: Moment
proc initOptimisticProcessor*(

View File

@ -136,8 +136,8 @@ proc createLightClient(
strictVerification)
proc lightClientVerifier(obj: SomeForkedLightClientObject):
Future[Result[void, VerifierError]] =
let resfut = newFuture[Result[void, VerifierError]]("lightClientVerifier")
Future[Result[void, VerifierError]] {.async: (raises: [CancelledError], raw: true).} =
let resfut = Future[Result[void, VerifierError]].Raising([CancelledError]).init("lightClientVerifier")
lightClient.processor[].addObject(MsgSource.gossip, obj, resfut)
resfut
proc bootstrapVerifier(obj: ForkedLightClientBootstrap): auto =

View File

@ -2076,12 +2076,16 @@ proc peerPingerHeartbeat(node: Eth2Node) {.async: (raises: [CancelledError]).} =
await allFutures(updateFutures)
reset(updateFutures)
for peer in node.peers.values:
if peer.connectionState != Connected: continue
if peer.failedMetadataRequests > MetadataRequestMaxFailures:
debug "no metadata from peer, kicking it", peer
await peer.disconnect(PeerScoreLow)
updateFutures.add(peer.disconnect(PeerScoreLow))
await allFutures(updateFutures)
await sleepAsync(5.seconds)
@ -2363,7 +2367,8 @@ proc subscribe*(
# Passing in `nil` because we do all message processing in the validator
node.pubsub.subscribe(topic, nil)
proc newValidationResultFuture(v: ValidationResult): Future[ValidationResult] =
proc newValidationResultFuture(v: ValidationResult): Future[ValidationResult]
{.async: (raises: [CancelledError], raw: true).} =
let res = newFuture[ValidationResult]("eth2_network.execValidator")
res.complete(v)
res
@ -2406,9 +2411,9 @@ proc addValidator*[MsgType](node: Eth2Node,
proc addAsyncValidator*[MsgType](node: Eth2Node,
topic: string,
msgValidator: proc(msg: MsgType):
Future[ValidationResult] {.gcsafe, raises: [].} ) =
Future[ValidationResult] {.async: (raises: [CancelledError]).} ) =
proc execValidator(topic: string, message: GossipMsg):
Future[ValidationResult] {.raises: [].} =
Future[ValidationResult] {.async: (raw: true).} =
inc nbc_gossip_messages_received
trace "Validating incoming gossip message", len = message.data.len, topic

View File

@ -1662,7 +1662,7 @@ proc installMessageValidators(node: BeaconNode) =
node.network.addAsyncValidator(
getAttestationTopic(digest, subnet_id), proc (
attestation: Attestation
): Future[ValidationResult] {.async.} =
): Future[ValidationResult] {.async: (raises: [CancelledError]).} =
return toValidationResult(
await node.processor.processAttestation(
MsgSource.gossip, attestation, subnet_id)))
@ -1672,7 +1672,7 @@ proc installMessageValidators(node: BeaconNode) =
node.network.addAsyncValidator(
getAggregateAndProofsTopic(digest), proc (
signedAggregateAndProof: SignedAggregateAndProof
): Future[ValidationResult] {.async.} =
): Future[ValidationResult] {.async: (raises: [CancelledError]).} =
return toValidationResult(
await node.processor.processSignedAggregateAndProof(
MsgSource.gossip, signedAggregateAndProof)))
@ -1716,7 +1716,7 @@ proc installMessageValidators(node: BeaconNode) =
node.network.addAsyncValidator(
getSyncCommitteeTopic(digest, idx), proc (
msg: SyncCommitteeMessage
): Future[ValidationResult] {.async.} =
): Future[ValidationResult] {.async: (raises: [CancelledError]).} =
return toValidationResult(
await node.processor.processSyncCommitteeMessage(
MsgSource.gossip, msg, idx)))
@ -1726,7 +1726,7 @@ proc installMessageValidators(node: BeaconNode) =
node.network.addAsyncValidator(
getSyncCommitteeContributionAndProofTopic(digest), proc (
msg: SignedContributionAndProof
): Future[ValidationResult] {.async.} =
): Future[ValidationResult] {.async: (raises: [CancelledError]).} =
return toValidationResult(
await node.processor.processSignedContributionAndProof(
MsgSource.gossip, msg)))
@ -1736,7 +1736,7 @@ proc installMessageValidators(node: BeaconNode) =
node.network.addAsyncValidator(
getBlsToExecutionChangeTopic(digest), proc (
msg: SignedBLSToExecutionChange
): Future[ValidationResult] {.async.} =
): Future[ValidationResult] {.async: (raises: [CancelledError]).} =
return toValidationResult(
await node.processor.processBlsToExecutionChange(
MsgSource.gossip, msg)))

View File

@ -106,7 +106,7 @@ programMain:
nil
optimisticHandler = proc(signedBlock: ForkedMsgTrustedSignedBeaconBlock):
Future[void] {.async.} =
Future[void] {.async: (raises: [CancelledError]).} =
notice "New LC optimistic block",
opt = signedBlock.toBlockId(),
wallSlot = getBeaconTime().slotOrZero

View File

@ -35,7 +35,7 @@ type
Endpoint[Nothing, ForkedLightClientOptimisticUpdate]
ValueVerifier[V] =
proc(v: V): Future[Result[void, VerifierError]] {.gcsafe, raises: [].}
proc(v: V): Future[Result[void, VerifierError]] {.async: (raises: [CancelledError]).}
BootstrapVerifier* =
ValueVerifier[ForkedLightClientBootstrap]
UpdateVerifier* =
@ -65,7 +65,7 @@ type
getFinalizedPeriod: GetSyncCommitteePeriodCallback
getOptimisticPeriod: GetSyncCommitteePeriodCallback
getBeaconTime: GetBeaconTimeFn
loopFuture: Future[void]
loopFuture: Future[void].Raising([CancelledError])
func init*(
T: type LightClientManager,
@ -115,8 +115,7 @@ proc doRequest(
e: typedesc[Bootstrap],
peer: Peer,
blockRoot: Eth2Digest
): Future[NetRes[ForkedLightClientBootstrap]] {.
raises: [IOError].} =
): Future[NetRes[ForkedLightClientBootstrap]] {.async: (raises: [CancelledError], raw: true).} =
peer.lightClientBootstrap(blockRoot)
# https://github.com/ethereum/consensus-specs/blob/v1.4.0-beta.5/specs/altair/light-client/p2p-interface.md#lightclientupdatesbyrange
@ -126,8 +125,7 @@ proc doRequest(
e: typedesc[UpdatesByRange],
peer: Peer,
key: tuple[startPeriod: SyncCommitteePeriod, count: uint64]
): Future[LightClientUpdatesByRangeResponse] {.
async.} =
): Future[LightClientUpdatesByRangeResponse] {.async: (raises: [ResponseError, CancelledError]).} =
let (startPeriod, count) = key
doAssert count > 0 and count <= MAX_REQUEST_LIGHT_CLIENT_UPDATES
let response = await peer.lightClientUpdatesByRange(startPeriod, count)
@ -142,14 +140,14 @@ proc doRequest(
proc doRequest(
e: typedesc[FinalityUpdate],
peer: Peer
): Future[NetRes[ForkedLightClientFinalityUpdate]] =
): Future[NetRes[ForkedLightClientFinalityUpdate]] {.async: (raises: [CancelledError], raw: true).} =
peer.lightClientFinalityUpdate()
# https://github.com/ethereum/consensus-specs/blob/v1.4.0-beta.5/specs/altair/light-client/p2p-interface.md#getlightclientoptimisticupdate
proc doRequest(
e: typedesc[OptimisticUpdate],
peer: Peer
): Future[NetRes[ForkedLightClientOptimisticUpdate]] =
): Future[NetRes[ForkedLightClientOptimisticUpdate]] {.async: (raises: [CancelledError], raw: true).} =
peer.lightClientOptimisticUpdate()
template valueVerifier[E](
@ -179,7 +177,7 @@ proc workerTask[E](
self: LightClientManager,
e: typedesc[E],
key: E.K
): Future[bool] {.async.} =
): Future[bool] {.async: (raises: [CancelledError]).} =
var
peer: Peer
didProgress = false
@ -246,12 +244,6 @@ proc workerTask[E](
raise exc
except PeerPoolError as exc:
debug "Failed to acquire peer", exc = exc.msg
except CatchableError as exc:
if peer != nil:
peer.updateScore(PeerScoreNoValues)
debug "Unexpected exception while receiving value", exc = exc.msg,
endpoint = E.name, peer, peer_score = peer.getScore()
raise exc
finally:
if peer != nil:
self.network.peerPool.release(peer)
@ -261,13 +253,13 @@ proc query[E](
self: LightClientManager,
e: typedesc[E],
key: E.K
): Future[bool] {.async.} =
): Future[bool] {.async: (raises: [CancelledError]).} =
const PARALLEL_REQUESTS = 2
var workers: array[PARALLEL_REQUESTS, Future[bool]]
let
progressFut = newFuture[void]("lcmanProgress")
doneFut = newFuture[void]("lcmanDone")
progressFut = Future[void].Raising([CancelledError]).init("lcmanProgress")
doneFut = Future[void].Raising([CancelledError]).init("lcmanDone")
var
numCompleted = 0
maxCompleted = workers.len
@ -300,7 +292,10 @@ proc query[E](
workers[i].complete(false)
# Wait for any worker to report progress, or for all workers to finish
discard await race(progressFut, doneFut)
try:
discard await race(progressFut, doneFut)
except ValueError:
raiseAssert "race API invariant"
finally:
for i in 0 ..< maxCompleted:
if workers[i] == nil:
@ -330,11 +325,11 @@ proc query[E](
template query[E](
self: LightClientManager,
e: typedesc[E]
): Future[bool] =
): Future[bool].Raising([CancelledError]) =
self.query(e, Nothing())
# https://github.com/ethereum/consensus-specs/blob/v1.4.0-beta.6/specs/altair/light-client/light-client.md#light-client-sync-process
proc loop(self: LightClientManager) {.async.} =
proc loop(self: LightClientManager) {.async: (raises: [CancelledError]).} =
var nextSyncTaskTime = self.getBeaconTime()
while true:
# Periodically wake and check for changes
@ -391,8 +386,8 @@ proc start*(self: var LightClientManager) =
doAssert self.loopFuture == nil
self.loopFuture = self.loop()
proc stop*(self: var LightClientManager) {.async.} =
proc stop*(self: var LightClientManager) {.async: (raises: []).} =
## Stop light client manager's loop.
if self.loopFuture != nil:
await self.loopFuture.cancelAndWait()
await noCancel self.loopFuture.cancelAndWait()
self.loopFuture = nil

View File

@ -576,7 +576,7 @@ suite "Validator Client test suite":
else:
return await request.respond(Http404, "Page not found")
else:
return dumbResponse()
return defaultResponse()
let server = createServer(initTAddress("127.0.0.1:0"), process, false)
server.start()
@ -660,7 +660,7 @@ suite "Validator Client test suite":
else:
return await request.respond(Http404, "Page not found")
else:
return dumbResponse()
return defaultResponse()
let server = createServer(initTAddress("127.0.0.1:0"), process, false)
server.start()

2
vendor/nim-chronos vendored

@ -1 +1 @@
Subproject commit 3ca2c5e6b510c15ce88c94ed25731b30f7ad46b5
Subproject commit e296ae30c84bdd1f0b12c50ab551ed080f8a815c