This commit is contained in:
tersec 2024-01-29 07:35:16 +00:00 committed by GitHub
parent 225ef5e69a
commit 3d7f634e70
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 15 additions and 12 deletions

View File

@ -61,7 +61,7 @@ type
blobs*: Opt[BlobSidecars]
maybeFinalized*: bool
## The block source claims the block has been finalized already
resfut*: Future[Result[void, VerifierError]]
resfut*: Future[Result[void, VerifierError]].Raising([CancelledError])
queueTick*: Moment # Moment when block was enqueued
validationDur*: Duration # Time it took to perform gossip validation
src*: MsgSource
@ -385,7 +385,7 @@ proc checkBloblessSignature(self: BlockProcessor,
proc enqueueBlock*(
self: var BlockProcessor, src: MsgSource, blck: ForkedSignedBeaconBlock,
blobs: Opt[BlobSidecars],
resfut: Future[Result[void, VerifierError]] = nil,
resfut: Future[Result[void, VerifierError]].Raising([CancelledError]) = nil,
maybeFinalized = false,
validationDur = Duration()) =
withBlck(blck):
@ -756,7 +756,7 @@ proc storeBlock(
proc addBlock*(
self: var BlockProcessor, src: MsgSource, blck: ForkedSignedBeaconBlock,
blobs: Opt[BlobSidecars], maybeFinalized = false,
validationDur = Duration()): Future[Result[void, VerifierError]] =
validationDur = Duration()): Future[Result[void, VerifierError]] {.async: (raises: [CancelledError], raw: true).} =
## Enqueue a Gossip-validated block for consensus verification
# Backpressure:
# There is no backpressure here - producers must wait for `resfut` to

View File

@ -108,12 +108,15 @@ template outgoingEvent(eventType: EventType): AsyncEvent =
pool.outNotFullEvent
proc waitForEvent[A, B](pool: PeerPool[A, B], eventType: EventType,
filter: set[PeerType]) {.async.} =
filter: set[PeerType]) {.async: (raises: [CancelledError]).} =
if filter == {PeerType.Incoming, PeerType.Outgoing} or filter == {}:
var fut1 = incomingEvent(eventType).wait()
var fut2 = outgoingEvent(eventType).wait()
try:
discard await one(fut1, fut2)
try:
discard await one(fut1, fut2)
except ValueError:
raiseAssert "one precondition satisfied"
if fut1.finished():
if not(fut2.finished()):
await fut2.cancelAndWait()
@ -138,11 +141,11 @@ proc waitForEvent[A, B](pool: PeerPool[A, B], eventType: EventType,
outgoingEvent(eventType).clear()
proc waitNotEmptyEvent[A, B](pool: PeerPool[A, B],
filter: set[PeerType]): Future[void] =
filter: set[PeerType]) {.async: (raises: [CancelledError], raw: true).} =
pool.waitForEvent(EventType.NotEmptyEvent, filter)
proc waitNotFullEvent[A, B](pool: PeerPool[A, B],
filter: set[PeerType]): Future[void] =
filter: set[PeerType]){.async: (raises: [CancelledError], raw: true).} =
pool.waitForEvent(EventType.NotFullEvent, filter)
proc newPeerPool*[A, B](maxPeers = -1, maxIncomingPeers = -1,
@ -451,7 +454,7 @@ proc getPeerSpaceMask[A, B](pool: PeerPool[A, B],
{PeerType.Outgoing}
proc waitForEmptySpace*[A, B](pool: PeerPool[A, B],
peerType: PeerType) {.async.} =
peerType: PeerType) {.async: (raises: [CancelledError]).} =
## This procedure will block until ``pool`` will have an empty space for peer
## of type ``peerType``.
let mask = pool.getPeerSpaceMask(peerType)
@ -459,7 +462,7 @@ proc waitForEmptySpace*[A, B](pool: PeerPool[A, B],
await pool.waitNotFullEvent(mask)
proc addPeer*[A, B](pool: PeerPool[A, B],
peer: A, peerType: PeerType): Future[PeerStatus] {.async.} =
peer: A, peerType: PeerType): Future[PeerStatus] {.async: (raises: [CancelledError]).} =
## Add peer ``peer`` of type ``peerType`` to PeerPool ``pool``.
##
## This procedure will wait for an empty space in PeerPool ``pool``, if
@ -533,7 +536,7 @@ proc acquireItemImpl[A, B](pool: PeerPool[A, B],
proc acquire*[A, B](pool: PeerPool[A, B],
filter = {PeerType.Incoming,
PeerType.Outgoing}): Future[A] {.async.} =
PeerType.Outgoing}): Future[A] {.async: (raises: [CancelledError]).} =
## Acquire peer from PeerPool ``pool``, which match the filter ``filter``.
mixin getKey
doAssert(filter != {}, "Filter must not be empty")
@ -586,7 +589,7 @@ proc release*[A, B](pool: PeerPool[A, B], peers: openArray[A]) {.inline.} =
proc acquire*[A, B](pool: PeerPool[A, B],
number: int,
filter = {PeerType.Incoming,
PeerType.Outgoing}): Future[seq[A]] {.async.} =
PeerType.Outgoing}): Future[seq[A]] {.async: (raises: [CancelledError]).} =
## Acquire ``number`` number of peers from PeerPool ``pool``, which match the
## filter ``filter``.
doAssert(filter != {}, "Filter must not be empty")
@ -735,7 +738,7 @@ proc clear*[A, B](pool: PeerPool[A, B]) =
pool.acqIncPeersCount = 0
pool.acqOutPeersCount = 0
proc clearSafe*[A, B](pool: PeerPool[A, B]) {.async.} =
proc clearSafe*[A, B](pool: PeerPool[A, B]) {.async: (raises: [CancelledError]).} =
## Performs "safe" clear. Safe means that it first acquires all the peers
## in PeerPool, and only after that it will reset storage.
var acquired = newSeq[A]()