simplify eth2_network error handling (#5765)

This PR gets rid of a bunch of redundant exception handling through
async raises guarantees.

More can be removed once libp2p gets properly annotated.
This commit is contained in:
Jacek Sieka 2024-01-19 22:05:52 +01:00 committed by GitHub
parent a5daa6d7e9
commit 3ff9b69bf1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
13 changed files with 445 additions and 495 deletions

View File

@ -61,7 +61,7 @@ type
blobs*: Opt[BlobSidecars]
maybeFinalized*: bool
## The block source claims the block has been finalized already
resfut*: Future[Result[void, VerifierError]]
resfut*: Future[Result[void, VerifierError]].Raising([CancelledError])
queueTick*: Moment # Moment when block was enqueued
validationDur*: Duration # Time it took to perform gossip validation
src*: MsgSource
@ -385,7 +385,7 @@ proc checkBloblessSignature(self: BlockProcessor,
proc enqueueBlock*(
self: var BlockProcessor, src: MsgSource, blck: ForkedSignedBeaconBlock,
blobs: Opt[BlobSidecars],
resfut: Future[Result[void, VerifierError]] = nil,
resfut: Future[Result[void, VerifierError]].Raising([CancelledError]) = nil,
maybeFinalized = false,
validationDur = Duration()) =
withBlck(blck):
@ -756,7 +756,7 @@ proc storeBlock(
proc addBlock*(
self: var BlockProcessor, src: MsgSource, blck: ForkedSignedBeaconBlock,
blobs: Opt[BlobSidecars], maybeFinalized = false,
validationDur = Duration()): Future[Result[void, VerifierError]] =
validationDur = Duration()): Future[Result[void, VerifierError]] {.async: (raises: [CancelledError], raw: true).} =
## Enqueue a Gossip-validated block for consensus verification
# Backpressure:
# There is no backpressure here - producers must wait for `resfut` to

View File

@ -74,7 +74,7 @@ type
seenThreshold*: chronos.Duration
connQueue: AsyncQueue[PeerAddr]
seenTable: Table[PeerId, SeenItem]
connWorkers: seq[Future[void]]
connWorkers: seq[Future[void].Raising([CancelledError])]
connTable: HashSet[PeerId]
forkId*: ENRForkID
discoveryForkId*: ENRForkID
@ -83,8 +83,8 @@ type
peers*: Table[PeerId, Peer]
directPeers*: DirectPeers
validTopics: HashSet[string]
peerPingerHeartbeatFut: Future[void]
peerTrimmerHeartbeatFut: Future[void]
peerPingerHeartbeatFut: Future[void].Raising([CancelledError])
peerTrimmerHeartbeatFut: Future[void].Raising([CancelledError])
cfg: RuntimeConfig
getBeaconTime: GetBeaconTimeFn
@ -167,8 +167,8 @@ type
PeerStateInitializer* = proc(peer: Peer): RootRef {.gcsafe, raises: [].}
NetworkStateInitializer* = proc(network: Eth2Node): RootRef {.gcsafe, raises: [].}
OnPeerConnectedHandler* = proc(peer: Peer, incoming: bool): Future[void] {.gcsafe, raises: [].}
OnPeerDisconnectedHandler* = proc(peer: Peer): Future[void] {.gcsafe, raises: [].}
OnPeerConnectedHandler* = proc(peer: Peer, incoming: bool): Future[void] {.async: (raises: [CancelledError]).}
OnPeerDisconnectedHandler* = proc(peer: Peer): Future[void] {.async: (raises: [CancelledError]).}
ThunkProc* = LPProtoHandler
MounterProc* = proc(network: Eth2Node) {.gcsafe, raises: [].}
MessageContentPrinter* = proc(msg: pointer): string {.gcsafe, raises: [].}
@ -183,9 +183,6 @@ type
# erroneous request-specific responses.
PeerScoreLow = 237 # 79 * 3
PeerDisconnected* = object of CatchableError
reason*: DisconnectionReason
TransmissionError* = object of CatchableError
Eth2NetworkingErrorKind* = enum
@ -207,6 +204,8 @@ type
InvalidContextBytes
ResponseChunkOverflow
UnknownError
Eth2NetworkingError = object
case kind*: Eth2NetworkingErrorKind
of ReceivedErrorResponse:
@ -340,14 +339,22 @@ func shortProtocolId(protocolId: string): string =
proc openStream(node: Eth2Node,
peer: Peer,
protocolId: string): Future[Connection] {.async.} =
protocolId: string): Future[NetRes[Connection]]
{.async: (raises: [CancelledError]).} =
# When dialing here, we do not provide addresses - all new connection
# attempts are handled via `connect` which also takes into account
# reconnection timeouts
let
conn = await dial(node.switch, peer.peerId, protocolId)
return conn
try:
ok await dial(node.switch, peer.peerId, protocolId)
except LPError as exc:
debug "Dialling failed", exc = exc.msg
neterr BrokenConnection
except CancelledError as exc:
raise exc
except CatchableError as exc:
# TODO remove once libp2p supports `raises`
warn "Unknown error when opening stream", exc = exc.msg
neterr UnknownError
proc init(T: type Peer, network: Eth2Node, peerId: PeerId): Peer {.gcsafe.}
@ -523,8 +530,11 @@ proc addSeen(network: Eth2Node, peerId: PeerId,
network.seenTable[peerId] = item
proc disconnect*(peer: Peer, reason: DisconnectionReason,
notifyOtherPeer = false) {.async.} =
# TODO(zah): How should we notify the other peer?
notifyOtherPeer = false) {.async: (raises: [CancelledError]).} =
# Per the specification, we MAY send a disconnect reason to the other peer but
# we currently don't - the fact that we're disconnecting is obvious and the
# reason already known (wrong network is known from status message) or doesn't
# greatly matter for the listening side (since it can't be trusted anyway)
try:
if peer.connectionState notin {Disconnecting, Disconnected}:
peer.connectionState = Disconnecting
@ -540,12 +550,16 @@ proc disconnect*(peer: Peer, reason: DisconnectionReason,
SeenTablePenaltyError
peer.network.addSeen(peer.peerId, seenTime)
await peer.network.switch.disconnect(peer.peerId)
except CatchableError:
# We do not care about exceptions in disconnection procedure.
trace "Exception while disconnecting peer", peer = peer.peerId,
reason = reason
except CancelledError as exc:
raise exc
except CatchableError as exc:
# switch.disconnect shouldn't raise
warn "Unexpected error while disconnecting peer",
peer = peer.peerId,
reason = reason,
exc = exc.msg
proc releasePeer*(peer: Peer) =
proc releasePeer(peer: Peer) =
## Checks for peer's score and disconnects peer if score is less than
## `PeerScoreLowLimit`.
if peer.connectionState notin {ConnectionState.Disconnecting,
@ -641,22 +655,33 @@ proc sendErrorResponse(peer: Peer,
peer, responseCode, errMsg = formatErrorMsg(errMsg)
conn.writeChunk(Opt.some responseCode, SSZ.encode(errMsg))
proc sendNotificationMsg(peer: Peer, protocolId: string, requestBytes: seq[byte]) {.async.} =
var
proc sendNotificationMsg(peer: Peer, protocolId: string, requestBytes: seq[byte])
{.async: (raises: [CancelledError]).} =
# Notifications are sent as a best effort, ie errors are not reported back
# to the caller
let
deadline = sleepAsync RESP_TIMEOUT_DUR
streamFut = peer.network.openStream(peer, protocolId)
streamRes = awaitWithTimeout(peer.network.openStream(peer, protocolId), deadline):
debug "Timeout while opening stream for notification", peer, protocolId
return
await streamFut or deadline
let stream = streamRes.valueOr:
debug "Could not open stream for notification",
peer, protocolId, error = streamRes.error
return
if not streamFut.finished:
await streamFut.cancelAndWait()
raise newException(TransmissionError, "Failed to open LibP2P stream")
let stream = streamFut.read
try:
await stream.writeChunk(Opt.none ResponseCode, requestBytes)
except CancelledError as exc:
raise exc
except CatchableError as exc:
debug "Error while writing notification", peer, protocolId, exc = exc.msg
finally:
await stream.close()
try:
await noCancel stream.close()
except CatchableError as exc:
warn "Unexpected error while closing notification stream",
peer, protocolId, exc = exc.msg
proc sendResponseChunkBytesSZ(
response: UntypedResponse, uncompressedLen: uint64,
@ -687,13 +712,17 @@ template sendUserHandlerResultAsChunkImpl*(stream: Connection,
writeChunk(stream, Opt.some ResponseCode.Success, SSZ.encode(handlerResult))
proc uncompressFramedStream(conn: Connection,
expectedSize: int): Future[Result[seq[byte], cstring]]
{.async.} =
expectedSize: int): Future[Result[seq[byte], string]]
{.async: (raises: [CancelledError]).} =
var header: array[framingHeader.len, byte]
try:
await conn.readExactly(addr header[0], header.len)
except LPStreamEOFError, LPStreamIncompleteError:
return err "Unexpected EOF before snappy header"
except CancelledError as exc:
raise exc
except CatchableError as exc:
return err "Unexpected error reading header: " & exc.msg
if header != framingHeader:
return err "Incorrect snappy header"
@ -712,6 +741,10 @@ proc uncompressFramedStream(conn: Connection,
await conn.readExactly(addr frameHeader[0], frameHeader.len)
except LPStreamEOFError, LPStreamIncompleteError:
return err "Snappy frame header missing"
except CancelledError as exc:
raise exc
except CatchableError as exc:
return err "Unexpected error reading frame header: " & exc.msg
let (id, dataLen) = decodeFrameHeader(frameHeader)
@ -726,6 +759,10 @@ proc uncompressFramedStream(conn: Connection,
await conn.readExactly(addr frameData[0], dataLen)
except LPStreamEOFError, LPStreamIncompleteError:
return err "Incomplete snappy frame"
except CancelledError as exc:
raise exc
except CatchableError as exc:
return err "Unexpected error reading frame data: " & exc.msg
if id == chunkCompressed:
if dataLen < 6: # At least CRC + 2 bytes of frame data
@ -809,7 +846,8 @@ template gossipMaxSize(T: untyped): uint32 =
static: doAssert maxSize <= GOSSIP_MAX_SIZE
maxSize.uint32
proc readVarint2(conn: Connection): Future[NetRes[uint64]] {.async.} =
proc readVarint2(conn: Connection): Future[NetRes[uint64]] {.
async: (raises: [CancelledError]).} =
try:
ok await conn.readVarint()
except LPStreamEOFError: #, LPStreamIncompleteError, InvalidVarintError
@ -820,9 +858,15 @@ proc readVarint2(conn: Connection): Future[NetRes[uint64]] {.async.} =
neterr UnexpectedEOF
except InvalidVarintError:
neterr InvalidSizePrefix
except CancelledError as exc:
raise exc
except CatchableError as exc:
warn "Unexpected error", exc = exc.msg
neterr UnknownError
proc readChunkPayload*(conn: Connection, peer: Peer,
MsgType: type): Future[NetRes[MsgType]] {.async.} =
MsgType: type): Future[NetRes[MsgType]]
{.async: (raises: [CancelledError]).} =
let
sm = now(chronos.Moment)
size = ? await readVarint2(conn)
@ -844,19 +888,26 @@ proc readChunkPayload*(conn: Connection, peer: Peer,
# not be significant.
peer.updateNetThroughput(now(chronos.Moment) - sm,
uint64(10 + size))
try:
ok SSZ.decode(data, MsgType)
except SerializationError:
neterr InvalidSszBytes
proc readResponseChunk(
conn: Connection, peer: Peer, MsgType: typedesc):
Future[NetRes[MsgType]] {.async.} =
Future[NetRes[MsgType]] {.async: (raises: [CancelledError]).} =
mixin readChunkPayload
try:
var responseCodeByte: byte
try:
await conn.readExactly(addr responseCodeByte, 1)
except LPStreamEOFError, LPStreamIncompleteError:
return neterr PotentiallyExpectedEOF
except CancelledError as exc:
raise exc
except CatchableError as exc:
warn "Unexpected error", exc = exc.msg
return neterr UnknownError
static: assert ResponseCode.low.ord == 0
if responseCodeByte > ResponseCode.high.byte:
@ -866,9 +917,7 @@ proc readResponseChunk(
case responseCode:
of InvalidRequest, ServerError, ResourceUnavailable:
let
errorMsgChunk = await readChunkPayload(conn, peer, ErrorMsg)
errorMsg = if errorMsgChunk.isOk: errorMsgChunk.value
else: return err(errorMsgChunk.error)
errorMsg = ? await readChunkPayload(conn, peer, ErrorMsg)
errorMsgStr = toPrettyString(errorMsg.asSeq)
debug "Error response from peer", responseCode, errMsg = errorMsgStr
return err Eth2NetworkingError(kind: ReceivedErrorResponse,
@ -879,11 +928,9 @@ proc readResponseChunk(
return await readChunkPayload(conn, peer, MsgType)
except LPStreamEOFError, LPStreamIncompleteError:
return neterr UnexpectedEOF
proc readResponse(conn: Connection, peer: Peer,
MsgType: type, timeout: Duration): Future[NetRes[MsgType]] {.async.} =
MsgType: type, timeout: Duration): Future[NetRes[MsgType]]
{.async: (raises: [CancelledError]).} =
when MsgType is List:
type E = MsgType.T
var results: MsgType
@ -897,7 +944,7 @@ proc readResponse(conn: Connection, peer: Peer,
let nextFut = conn.readResponseChunk(peer, E)
if not await nextFut.withTimeout(timeout):
return neterr(ReadResponseTimeout)
let nextRes = nextFut.read()
let nextRes = await nextFut
if nextRes.isErr:
if nextRes.error.kind == PotentiallyExpectedEOF:
trace "EOF chunk", conn, err = nextRes.error
@ -914,15 +961,19 @@ proc readResponse(conn: Connection, peer: Peer,
let nextFut = conn.readResponseChunk(peer, MsgType)
if not await nextFut.withTimeout(timeout):
return neterr(ReadResponseTimeout)
return nextFut.read()
return await nextFut # Guaranteed to complete without waiting
proc makeEth2Request(peer: Peer, protocolId: string, requestBytes: seq[byte],
ResponseMsg: type,
timeout: Duration): Future[NetRes[ResponseMsg]]
{.async.} =
let deadline = sleepAsync timeout
let stream = awaitWithTimeout(peer.network.openStream(peer, protocolId),
deadline): return neterr StreamOpenTimeout
{.async: (raises: [CancelledError]).} =
let
deadline = sleepAsync timeout
streamRes =
awaitWithTimeout(peer.network.openStream(peer, protocolId), deadline):
return neterr StreamOpenTimeout
stream = ?streamRes
try:
# Send the request
# Some clients don't want a length sent for empty requests
@ -942,13 +993,18 @@ proc makeEth2Request(peer: Peer, protocolId: string, requestBytes: seq[byte],
peer.updateScore(PeerScoreInvalidRequest)
else:
peer.updateScore(PeerScorePoorRequest)
return res
except SerializationError as exc:
# Yay for both exceptions and results!
peer.updateScore(PeerScoreInvalidRequest)
res
except CancelledError as exc:
raise exc
except CatchableError:
peer.updateScore(PeerScorePoorRequest)
neterr BrokenConnection
finally:
await stream.closeWithEOF()
try:
await noCancel stream.closeWithEOF()
except CatchableError as exc:
warn "Unexpected error while closing stream",
peer, protocolId, exc = exc.msg
proc init*(T: type MultipleChunksResponse, peer: Peer, conn: Connection): T =
T(UntypedResponse(peer: peer, stream: conn))
@ -988,7 +1044,7 @@ template sendSSZ*[M](
doAssert UntypedResponse(r).writtenChunks == 0
sendResponseChunk(UntypedResponse(r), val, contextBytes)
proc performProtocolHandshakes(peer: Peer, incoming: bool) {.async.} =
proc performProtocolHandshakes(peer: Peer, incoming: bool) {.async: (raises: [CancelledError]).} =
# Loop down serially because it's easier to reason about the connection state
# when there are fewer async races, specially during setup
for protocol in peer.network.protocols:
@ -1036,7 +1092,7 @@ proc implementSendProcBody(sendProc: SendProc) =
proc handleIncomingStream(network: Eth2Node,
conn: Connection,
protocolId: string,
MsgType: type) {.async.} =
MsgType: type) {.async: (raises: [CancelledError]).} =
mixin callUserHandler, RecType
type MsgRec = RecType(MsgType)
@ -1057,7 +1113,6 @@ proc handleIncomingStream(network: Eth2Node,
# We got incoming stream request while disconnected or disconnecting.
debug "Got incoming request from disconnected peer", peer = peer,
message = msgName
await conn.closeWithEOF()
return
of Connecting:
# We got incoming stream request while handshake is not yet finished,
@ -1114,10 +1169,6 @@ proc handleIncomingStream(network: Eth2Node,
errorMsgLit "Request full data not sent in time")
return
except SerializationError as err:
nbc_reqresp_messages_failed.inc(1, [shortProtocolId(protocolId)])
returnInvalidRequest err.formatMsg("msg")
finally:
# The request quota is shared between all requests - it represents the
# cost to perform a service on behalf of a client and is incurred
@ -1182,27 +1233,33 @@ proc handleIncomingStream(network: Eth2Node,
of ResponseChunkOverflow:
(InvalidRequest, errorMsgLit "Too many chunks in response")
of UnknownError:
(InvalidRequest, errorMsgLit "Unknown error while processing request")
await sendErrorResponse(peer, conn, responseCode, errMsg)
return
try:
# logReceivedMsg(peer, MsgType(msg.get))
await callUserHandler(MsgType, peer, conn, msg.get)
except InvalidInputsError as err:
except InvalidInputsError as exc:
nbc_reqresp_messages_failed.inc(1, [shortProtocolId(protocolId)])
returnInvalidRequest err.msg
except ResourceUnavailableError as err:
returnResourceUnavailable err.msg
except CatchableError as err:
returnInvalidRequest exc.msg
except ResourceUnavailableError as exc:
returnResourceUnavailable exc.msg
except CatchableError as exc:
nbc_reqresp_messages_failed.inc(1, [shortProtocolId(protocolId)])
await sendErrorResponse(peer, conn, ServerError, ErrorMsg err.msg.toBytes)
await sendErrorResponse(peer, conn, ServerError, ErrorMsg exc.msg.toBytes)
except CatchableError as err:
except CatchableError as exc:
nbc_reqresp_messages_failed.inc(1, [shortProtocolId(protocolId)])
debug "Error processing an incoming request", err = err.msg, msgName
debug "Error processing an incoming request", exc = exc.msg, msgName
finally:
await conn.closeWithEOF()
try:
await noCancel conn.closeWithEOF()
except CatchableError as exc:
warn "Unexpected error while closing incoming connection", exc = exc.msg
releasePeer(peer)
proc toPeerAddr*(r: enr.TypedRecord,
@ -1264,7 +1321,7 @@ proc checkPeer(node: Eth2Node, peerAddr: PeerAddr): bool =
else:
true
proc dialPeer(node: Eth2Node, peerAddr: PeerAddr, index = 0) {.async.} =
proc dialPeer(node: Eth2Node, peerAddr: PeerAddr, index = 0) {.async: (raises: [CancelledError]).} =
## Establish connection with remote peer identified by address ``peerAddr``.
logScope:
peer = peerAddr.peerId
@ -1299,7 +1356,7 @@ proc dialPeer(node: Eth2Node, peerAddr: PeerAddr, index = 0) {.async.} =
inc nbc_failed_dials
node.addSeen(peerAddr.peerId, SeenTableTimeDeadPeer)
proc connectWorker(node: Eth2Node, index: int) {.async.} =
proc connectWorker(node: Eth2Node, index: int) {.async: (raises: [CancelledError]).} =
debug "Connection worker started", index = index
while true:
# This loop will never produce HIGH CPU usage because it will wait
@ -1575,7 +1632,7 @@ proc resolvePeer(peer: Peer) =
nbc_resolve_time.observe(delay.toFloatSeconds())
debug "Peer's ENR recovered", delay
proc handlePeer*(peer: Peer) {.async.} =
proc handlePeer*(peer: Peer) {.async: (raises: [CancelledError]).} =
let res = peer.network.peerPool.addPeerNoWait(peer, peer.direction)
case res:
of PeerStatus.LowScoreError, PeerStatus.NoSpaceError:
@ -1607,7 +1664,9 @@ proc handlePeer*(peer: Peer) {.async.} =
debug "Peer successfully connected", peer = peer,
connections = peer.connections
proc onConnEvent(node: Eth2Node, peerId: PeerId, event: ConnEvent) {.async.} =
proc onConnEvent(
node: Eth2Node, peerId: PeerId, event: ConnEvent) {.
async: (raises: [CancelledError]).} =
let peer = node.getPeer(peerId)
case event.kind
of ConnEventKind.Connected:
@ -1632,7 +1691,12 @@ proc onConnEvent(node: Eth2Node, peerId: PeerId, event: ConnEvent) {.async.} =
# we might end up here
debug "Got connection attempt from peer that we are disconnecting",
peer = peerId
try:
await node.switch.disconnect(peerId)
except CancelledError as exc:
raise exc
except CatchableError as exc:
warn "Unexpected error while disconnecting peer", exc = exc.msg
return
of None:
# We have established a connection with the new peer.
@ -1768,23 +1832,22 @@ proc startListening*(node: Eth2Node) {.async.} =
if node.discoveryEnabled:
try:
node.discovery.open()
except CatchableError as err:
except CatchableError as exc:
fatal "Failed to start discovery service. UDP port may be already in use",
err = err.msg
exc = exc.msg
quit 1
try:
await node.switch.start()
except CatchableError as err:
except CatchableError as exc:
fatal "Failed to start LibP2P transport. TCP port may be already in use",
err = err.msg
exc = exc.msg
quit 1
proc peerPingerHeartbeat(node: Eth2Node): Future[void] {.gcsafe.}
proc peerTrimmerHeartbeat(node: Eth2Node): Future[void] {.gcsafe.}
proc start*(node: Eth2Node) {.async.} =
proc peerPingerHeartbeat(node: Eth2Node): Future[void] {.async: (raises: [CancelledError]).}
proc peerTrimmerHeartbeat(node: Eth2Node): Future[void] {.async: (raises: [CancelledError]).}
proc start*(node: Eth2Node) {.async: (raises: [CancelledError]).} =
proc onPeerCountChanged() =
trace "Number of peers has been changed", length = len(node.peerPool)
nbc_peers.set int64(len(node.peerPool))
@ -1809,7 +1872,7 @@ proc start*(node: Eth2Node) {.async.} =
node.peerPingerHeartbeatFut = node.peerPingerHeartbeat()
node.peerTrimmerHeartbeatFut = node.peerTrimmerHeartbeat()
proc stop*(node: Eth2Node) {.async.} =
proc stop*(node: Eth2Node) {.async: (raises: [CancelledError]).} =
# Ignore errors in futures, since we're shutting down (but log them on the
# TRACE level, if a timeout is reached).
var waitedFutures =
@ -1980,17 +2043,14 @@ proc p2pProtocolBackendImpl*(p: P2PProtocol): Backend =
import ./peer_protocol
export peer_protocol
proc updatePeerMetadata(node: Eth2Node, peerId: PeerId) {.async.} =
proc updatePeerMetadata(node: Eth2Node, peerId: PeerId) {.async: (raises: [CancelledError]).} =
trace "updating peer metadata", peerId
var peer = node.getPeer(peerId)
#getMetaData can fail with an exception
let newMetadata =
try:
tryGet(await peer.getMetadata_v2())
except CatchableError as exc:
debug "Failed to retrieve metadata from peer!", peerId, msg=exc.msg
let
peer = node.getPeer(peerId)
newMetadataRes = await peer.getMetadata_v2()
newMetadata = newMetadataRes.valueOr:
debug "Failed to retrieve metadata from peer!", peerId, error = newMetadataRes.error
peer.failedMetadataRequests.inc()
return
@ -2003,7 +2063,7 @@ const
MetadataRequestFrequency = 30.minutes
MetadataRequestMaxFailures = 3
proc peerPingerHeartbeat(node: Eth2Node) {.async.} =
proc peerPingerHeartbeat(node: Eth2Node) {.async: (raises: [CancelledError]).} =
while true:
let heartbeatStart_m = Moment.now()
var updateFutures: seq[Future[void]]
@ -2022,24 +2082,21 @@ proc peerPingerHeartbeat(node: Eth2Node) {.async.} =
if peer.failedMetadataRequests > MetadataRequestMaxFailures:
debug "no metadata from peer, kicking it", peer
asyncSpawn peer.disconnect(PeerScoreLow)
await peer.disconnect(PeerScoreLow)
await sleepAsync(5.seconds)
proc peerTrimmerHeartbeat(node: Eth2Node) {.async.} =
proc peerTrimmerHeartbeat(node: Eth2Node) {.async: (raises: [CancelledError]).} =
# Disconnect peers in excess of the (soft) max peer count
while true:
# Peer trimmer
# Only count Connected peers (to avoid counting Disconnecting ones)
let
connectedPeers = node.peers.values.countIt(
it.connectionState == Connected)
excessPeers = connectedPeers - node.wantedPeers
# Only count Connected peers
# (to avoid counting Disconnecting ones)
var connectedPeers = 0
for peer in node.peers.values:
if peer.connectionState == Connected:
inc connectedPeers
let excessPeers = connectedPeers - node.wantedPeers
if excessPeers > 0:
# Let chronos take back control every kick
# Let chronos take back control every trimming
node.trimConnections(1)
await sleepAsync(1.seconds div max(1, excessPeers))
@ -2183,7 +2240,6 @@ proc createEth2Node*(rng: ref HmacDrbgContext,
config.nat, config.listenAddress, config.tcpPort, config.udpPort,
clientId)
except CatchableError as exc: raise exc
except Exception as exc: raiseAssert exc.msg
directPeers = block:
var res: DirectPeers
@ -2389,19 +2445,26 @@ proc gossipEncode(msg: auto): seq[byte] =
snappy.encode(uncompressed)
proc broadcast(node: Eth2Node, topic: string, msg: seq[byte]):
Future[Result[void, cstring]] {.async.} =
let peers = await node.pubsub.publish(topic, msg)
Future[SendResult] {.async: (raises: [CancelledError]).} =
let peers =
try:
await node.pubsub.publish(topic, msg)
except CancelledError as exc:
raise exc
except CatchableError as exc:
warn "Unknown error during broadcast", exc = exc.msg
return err("Broadcast failed")
# TODO remove workaround for sync committee BN/VC log spam
if peers > 0 or find(topic, "sync_committee_") != -1:
inc nbc_gossip_messages_sent
return ok()
ok()
else:
# Increments libp2p_gossipsub_failed_publish metric
return err("No peers on libp2p topic")
err("No peers on libp2p topic")
proc broadcast(node: Eth2Node, topic: string, msg: auto):
Future[Result[void, cstring]] =
Future[SendResult] {.async: (raises: [CancelledError], raw: true).} =
# Avoid {.async.} copies of message while broadcasting
broadcast(node, topic, gossipEncode(msg))
@ -2486,7 +2549,7 @@ proc getWallEpoch(node: Eth2Node): Epoch =
proc broadcastAttestation*(
node: Eth2Node, subnet_id: SubnetId, attestation: Attestation):
Future[SendResult] =
Future[SendResult] {.async: (raises: [CancelledError], raw: true).} =
# Regardless of the contents of the attestation,
# https://github.com/ethereum/consensus-specs/blob/v1.3.0/specs/altair/p2p-interface.md#transitioning-the-gossip
# implies that pre-fork, messages using post-fork digests might be
@ -2499,63 +2562,72 @@ proc broadcastAttestation*(
node.broadcast(topic, attestation)
proc broadcastVoluntaryExit*(
node: Eth2Node, exit: SignedVoluntaryExit): Future[SendResult] =
node: Eth2Node, exit: SignedVoluntaryExit):
Future[SendResult] {.async: (raises: [CancelledError], raw: true).} =
let topic = getVoluntaryExitsTopic(node.forkDigestAtEpoch(node.getWallEpoch))
node.broadcast(topic, exit)
proc broadcastAttesterSlashing*(
node: Eth2Node, slashing: AttesterSlashing): Future[SendResult] =
node: Eth2Node, slashing: AttesterSlashing):
Future[SendResult] {.async: (raises: [CancelledError], raw: true).} =
let topic = getAttesterSlashingsTopic(
node.forkDigestAtEpoch(node.getWallEpoch))
node.broadcast(topic, slashing)
proc broadcastProposerSlashing*(
node: Eth2Node, slashing: ProposerSlashing): Future[SendResult] =
node: Eth2Node, slashing: ProposerSlashing):
Future[SendResult] {.async: (raises: [CancelledError], raw: true).} =
let topic = getProposerSlashingsTopic(
node.forkDigestAtEpoch(node.getWallEpoch))
node.broadcast(topic, slashing)
proc broadcastBlsToExecutionChange*(
node: Eth2Node, bls_to_execution_change: SignedBLSToExecutionChange):
Future[SendResult] =
Future[SendResult] {.async: (raises: [CancelledError], raw: true).} =
let topic = getBlsToExecutionChangeTopic(
node.forkDigestAtEpoch(node.getWallEpoch))
node.broadcast(topic, bls_to_execution_change)
proc broadcastAggregateAndProof*(
node: Eth2Node, proof: SignedAggregateAndProof): Future[SendResult] =
node: Eth2Node, proof: SignedAggregateAndProof):
Future[SendResult] {.async: (raises: [CancelledError], raw: true).} =
let topic = getAggregateAndProofsTopic(
node.forkDigestAtEpoch(node.getWallEpoch))
node.broadcast(topic, proof)
proc broadcastBeaconBlock*(
node: Eth2Node, blck: phase0.SignedBeaconBlock): Future[SendResult] =
node: Eth2Node, blck: phase0.SignedBeaconBlock):
Future[SendResult] {.async: (raises: [CancelledError], raw: true).} =
let topic = getBeaconBlocksTopic(node.forkDigests.phase0)
node.broadcast(topic, blck)
proc broadcastBeaconBlock*(
node: Eth2Node, blck: altair.SignedBeaconBlock): Future[SendResult] =
node: Eth2Node, blck: altair.SignedBeaconBlock):
Future[SendResult] {.async: (raises: [CancelledError], raw: true).} =
let topic = getBeaconBlocksTopic(node.forkDigests.altair)
node.broadcast(topic, blck)
proc broadcastBeaconBlock*(
node: Eth2Node, blck: bellatrix.SignedBeaconBlock): Future[SendResult] =
node: Eth2Node, blck: bellatrix.SignedBeaconBlock):
Future[SendResult] {.async: (raises: [CancelledError], raw: true).} =
let topic = getBeaconBlocksTopic(node.forkDigests.bellatrix)
node.broadcast(topic, blck)
proc broadcastBeaconBlock*(
node: Eth2Node, blck: capella.SignedBeaconBlock): Future[SendResult] =
node: Eth2Node, blck: capella.SignedBeaconBlock):
Future[SendResult] {.async: (raises: [CancelledError], raw: true).} =
let topic = getBeaconBlocksTopic(node.forkDigests.capella)
node.broadcast(topic, blck)
proc broadcastBeaconBlock*(
node: Eth2Node, blck: deneb.SignedBeaconBlock): Future[SendResult] =
node: Eth2Node, blck: deneb.SignedBeaconBlock):
Future[SendResult] {.async: (raises: [CancelledError], raw: true).} =
let topic = getBeaconBlocksTopic(node.forkDigests.deneb)
node.broadcast(topic, blck)
proc broadcastBlobSidecar*(
node: Eth2Node, subnet_id: BlobId, blob: deneb.BlobSidecar):
Future[SendResult] =
Future[SendResult] {.async: (raises: [CancelledError], raw: true).} =
let
forkPrefix = node.forkDigestAtEpoch(node.getWallEpoch)
topic = getBlobSidecarTopic(forkPrefix, subnet_id)
@ -2563,27 +2635,29 @@ proc broadcastBlobSidecar*(
proc broadcastSyncCommitteeMessage*(
node: Eth2Node, msg: SyncCommitteeMessage,
subcommitteeIdx: SyncSubcommitteeIndex): Future[SendResult] =
subcommitteeIdx: SyncSubcommitteeIndex):
Future[SendResult] {.async: (raises: [CancelledError], raw: true).} =
let topic = getSyncCommitteeTopic(
node.forkDigestAtEpoch(node.getWallEpoch), subcommitteeIdx)
node.broadcast(topic, msg)
proc broadcastSignedContributionAndProof*(
node: Eth2Node, msg: SignedContributionAndProof): Future[SendResult] =
node: Eth2Node, msg: SignedContributionAndProof):
Future[SendResult] {.async: (raises: [CancelledError], raw: true).} =
let topic = getSyncCommitteeContributionAndProofTopic(
node.forkDigestAtEpoch(node.getWallEpoch))
node.broadcast(topic, msg)
proc broadcastLightClientFinalityUpdate*(
node: Eth2Node, msg: ForkyLightClientFinalityUpdate):
Future[SendResult] =
Future[SendResult] {.async: (raises: [CancelledError], raw: true).} =
let topic = getLightClientFinalityUpdateTopic(
node.forkDigestAtEpoch(msg.contextEpoch))
node.broadcast(topic, msg)
proc broadcastLightClientOptimisticUpdate*(
node: Eth2Node, msg: ForkyLightClientOptimisticUpdate):
Future[SendResult] =
Future[SendResult] {.async: (raises: [CancelledError], raw: true).} =
let topic = getLightClientOptimisticUpdateTopic(
node.forkDigestAtEpoch(msg.contextEpoch))
node.broadcast(topic, msg)

View File

@ -557,7 +557,7 @@ proc createSendProc*(msg: Message,
def[3][0] = if procType == nnkMacroDef:
ident "untyped"
elif msg.kind == msgRequest and not isRawSender:
Fut(msg.requestResultType)
ident "auto"
elif msg.kind == msgHandshake and not isRawSender:
Fut(msg.recName)
else:
@ -652,12 +652,17 @@ proc useStandardBody*(sendProc: SendProc,
sendProc.setBody quote do:
mixin init, WriterType, beginRecord, endRecord, getOutput
let `msgBytes` =
try:
var `outputStream` = memoryOutput()
`preSerialization`
`serialization`
`postSerialization`
`tracing`
let `msgBytes` = getOutput(`outputStream`)
getOutput(`outputStream`)
except IOError:
raiseAssert "memoryOutput doesn't raise IOError actually"
`sendCall`
proc correctSerializerProcParams(params: NimNode) =

View File

@ -108,12 +108,15 @@ template outgoingEvent(eventType: EventType): AsyncEvent =
pool.outNotFullEvent
proc waitForEvent[A, B](pool: PeerPool[A, B], eventType: EventType,
filter: set[PeerType]) {.async.} =
filter: set[PeerType]) {.async: (raises: [CancelledError]).} =
if filter == {PeerType.Incoming, PeerType.Outgoing} or filter == {}:
var fut1 = incomingEvent(eventType).wait()
var fut2 = outgoingEvent(eventType).wait()
try:
try:
discard await one(fut1, fut2)
except ValueError:
raiseAssert "one precondition satisfied"
if fut1.finished():
if not(fut2.finished()):
await fut2.cancelAndWait()
@ -138,11 +141,11 @@ proc waitForEvent[A, B](pool: PeerPool[A, B], eventType: EventType,
outgoingEvent(eventType).clear()
proc waitNotEmptyEvent[A, B](pool: PeerPool[A, B],
filter: set[PeerType]): Future[void] =
filter: set[PeerType]) {.async: (raises: [CancelledError], raw: true).} =
pool.waitForEvent(EventType.NotEmptyEvent, filter)
proc waitNotFullEvent[A, B](pool: PeerPool[A, B],
filter: set[PeerType]): Future[void] =
filter: set[PeerType]){.async: (raises: [CancelledError], raw: true).} =
pool.waitForEvent(EventType.NotFullEvent, filter)
proc newPeerPool*[A, B](maxPeers = -1, maxIncomingPeers = -1,
@ -451,7 +454,7 @@ proc getPeerSpaceMask[A, B](pool: PeerPool[A, B],
{PeerType.Outgoing}
proc waitForEmptySpace*[A, B](pool: PeerPool[A, B],
peerType: PeerType) {.async.} =
peerType: PeerType) {.async: (raises: [CancelledError]).} =
## This procedure will block until ``pool`` will have an empty space for peer
## of type ``peerType``.
let mask = pool.getPeerSpaceMask(peerType)
@ -459,7 +462,7 @@ proc waitForEmptySpace*[A, B](pool: PeerPool[A, B],
await pool.waitNotFullEvent(mask)
proc addPeer*[A, B](pool: PeerPool[A, B],
peer: A, peerType: PeerType): Future[PeerStatus] {.async.} =
peer: A, peerType: PeerType): Future[PeerStatus] {.async: (raises: [CancelledError]).} =
## Add peer ``peer`` of type ``peerType`` to PeerPool ``pool``.
##
## This procedure will wait for an empty space in PeerPool ``pool``, if
@ -533,7 +536,7 @@ proc acquireItemImpl[A, B](pool: PeerPool[A, B],
proc acquire*[A, B](pool: PeerPool[A, B],
filter = {PeerType.Incoming,
PeerType.Outgoing}): Future[A] {.async.} =
PeerType.Outgoing}): Future[A] {.async: (raises: [CancelledError]).} =
## Acquire peer from PeerPool ``pool``, which match the filter ``filter``.
mixin getKey
doAssert(filter != {}, "Filter must not be empty")
@ -586,7 +589,7 @@ proc release*[A, B](pool: PeerPool[A, B], peers: openArray[A]) {.inline.} =
proc acquire*[A, B](pool: PeerPool[A, B],
number: int,
filter = {PeerType.Incoming,
PeerType.Outgoing}): Future[seq[A]] {.async.} =
PeerType.Outgoing}): Future[seq[A]] {.async: (raises: [CancelledError]).} =
## Acquire ``number`` number of peers from PeerPool ``pool``, which match the
## filter ``filter``.
doAssert(filter != {}, "Filter must not be empty")
@ -735,7 +738,7 @@ proc clear*[A, B](pool: PeerPool[A, B]) =
pool.acqIncPeersCount = 0
pool.acqOutPeersCount = 0
proc clearSafe*[A, B](pool: PeerPool[A, B]) {.async.} =
proc clearSafe*[A, B](pool: PeerPool[A, B]) {.async: (raises: [CancelledError]).} =
## Performs "safe" clear. Safe means that it first acquires all the peers
## in PeerPool, and only after that it will reset storage.
var acquired = newSeq[A]()

View File

@ -110,7 +110,7 @@ proc checkStatusMsg(state: PeerSyncNetworkState, status: StatusMsg):
proc handleStatus(peer: Peer,
state: PeerSyncNetworkState,
theirStatus: StatusMsg): Future[bool] {.gcsafe.}
theirStatus: StatusMsg): Future[bool] {.async: (raises: [CancelledError]).}
{.pop.} # TODO fix p2p macro for raises
@ -118,7 +118,7 @@ p2pProtocol PeerSync(version = 1,
networkState = PeerSyncNetworkState,
peerState = PeerSyncPeerState):
onPeerConnected do (peer: Peer, incoming: bool) {.async.}:
onPeerConnected do (peer: Peer, incoming: bool) {.async: (raises: [CancelledError]).}:
debug "Peer connected",
peer, peerId = shortLog(peer.peerId), incoming
# Per the eth2 protocol, whoever dials must send a status message when
@ -155,7 +155,7 @@ p2pProtocol PeerSync(version = 1,
proc ping(peer: Peer, value: uint64): uint64
{.libp2pProtocol("ping", 1).} =
return peer.network.metadata.seq_number
peer.network.metadata.seq_number
# https://github.com/ethereum/consensus-specs/blob/v1.4.0-beta.5/specs/altair/p2p-interface.md#transitioning-from-v1-to-v2
proc getMetaData(peer: Peer): uint64
@ -164,10 +164,9 @@ p2pProtocol PeerSync(version = 1,
proc getMetadata_v2(peer: Peer): altair.MetaData
{.libp2pProtocol("metadata", 2).} =
return peer.network.metadata
peer.network.metadata
proc goodbye(peer: Peer,
reason: uint64)
proc goodbye(peer: Peer, reason: uint64)
{.async, libp2pProtocol("goodbye", 1).} =
debug "Received Goodbye message", reason = disconnectReasonName(reason), peer
@ -178,7 +177,8 @@ proc setStatusMsg(peer: Peer, statusMsg: StatusMsg) =
proc handleStatus(peer: Peer,
state: PeerSyncNetworkState,
theirStatus: StatusMsg): Future[bool] {.async.} =
theirStatus: StatusMsg): Future[bool]
{.async: (raises: [CancelledError]).} =
let
res = checkStatusMsg(state, theirStatus)
@ -195,21 +195,16 @@ proc handleStatus(peer: Peer,
await peer.handlePeer()
true
proc updateStatus*(peer: Peer): Future[bool] {.async.} =
proc updateStatus*(peer: Peer): Future[bool] {.async: (raises: [CancelledError]).} =
## Request `status` of remote peer ``peer``.
let
nstate = peer.networkState(PeerSync)
ourStatus = getCurrentStatus(nstate)
theirStatus =
(await peer.status(ourStatus, timeout = RESP_TIMEOUT_DUR)).valueOr:
return false
let theirFut = awaitne peer.status(ourStatus, timeout = RESP_TIMEOUT_DUR)
if theirFut.failed():
return false
else:
let theirStatus = theirFut.read()
if theirStatus.isOk:
return await peer.handleStatus(nstate, theirStatus.get())
else:
return false
await peer.handleStatus(nstate, theirStatus)
proc getHeadSlot*(peer: Peer): Slot =
## Returns head slot for specific peer ``peer``.

View File

@ -351,7 +351,7 @@ proc initFullNode(
blobQuarantine, getBeaconTime)
blockVerifier = proc(signedBlock: ForkedSignedBeaconBlock,
blobs: Opt[BlobSidecars], maybeFinalized: bool):
Future[Result[void, VerifierError]] =
Future[Result[void, VerifierError]] {.async: (raises: [CancelledError], raw: true).} =
# The design with a callback for block verification is unusual compared
# to the rest of the application, but fits with the general approach
# taken in the sync/request managers - this is an architectural compromise
@ -360,27 +360,23 @@ proc initFullNode(
MsgSource.gossip, signedBlock, blobs, maybeFinalized = maybeFinalized)
rmanBlockVerifier = proc(signedBlock: ForkedSignedBeaconBlock,
maybeFinalized: bool):
Future[Result[void, VerifierError]] =
Future[Result[void, VerifierError]] {.async: (raises: [CancelledError]).} =
withBlck(signedBlock):
when typeof(forkyBlck).kind >= ConsensusFork.Deneb:
when consensusFork >= ConsensusFork.Deneb:
if not blobQuarantine[].hasBlobs(forkyBlck):
# We don't have all the blobs for this block, so we have
# to put it in blobless quarantine.
if not quarantine[].addBlobless(dag.finalizedHead.slot, forkyBlck):
Future.completed(
Result[void, VerifierError].err(VerifierError.UnviableFork),
"rmanBlockVerifier")
err(VerifierError.UnviableFork)
else:
Future.completed(
Result[void, VerifierError].err(VerifierError.MissingParent),
"rmanBlockVerifier")
err(VerifierError.MissingParent)
else:
let blobs = blobQuarantine[].popBlobs(forkyBlck.root, forkyBlck)
blockProcessor[].addBlock(MsgSource.gossip, signedBlock,
await blockProcessor[].addBlock(MsgSource.gossip, signedBlock,
Opt.some(blobs),
maybeFinalized = maybeFinalized)
else:
blockProcessor[].addBlock(MsgSource.gossip, signedBlock,
await blockProcessor[].addBlock(MsgSource.gossip, signedBlock,
Opt.none(BlobSidecars),
maybeFinalized = maybeFinalized)

View File

@ -31,7 +31,7 @@ type
proc readChunkPayload*(
conn: Connection, peer: Peer, MsgType: type SomeForkedLightClientObject):
Future[NetRes[MsgType]] {.async.} =
Future[NetRes[MsgType]] {.async: (raises: [CancelledError]).} =
var contextBytes: ForkDigest
try:
await conn.readExactly(addr contextBytes, sizeof contextBytes)

View File

@ -39,7 +39,7 @@ const
type
BlockVerifierFn* =
proc(signedBlock: ForkedSignedBeaconBlock, maybeFinalized: bool):
Future[Result[void, VerifierError]] {.gcsafe, raises: [].}
Future[Result[void, VerifierError]] {.async: (raises: [CancelledError]).}
InhibitFn* = proc: bool {.gcsafe, raises:[].}
RequestManager* = object
@ -49,8 +49,8 @@ type
quarantine: ref Quarantine
blobQuarantine: ref BlobQuarantine
blockVerifier: BlockVerifierFn
blockLoopFuture: Future[void]
blobLoopFuture: Future[void]
blockLoopFuture: Future[void].Raising([CancelledError])
blobLoopFuture: Future[void].Raising([CancelledError])
func shortLog*(x: seq[Eth2Digest]): string =
"[" & x.mapIt(shortLog(it)).join(", ") & "]"
@ -104,7 +104,7 @@ proc checkResponse(idList: seq[BlobIdentifier],
return false
true
proc requestBlocksByRoot(rman: RequestManager, items: seq[Eth2Digest]) {.async.} =
proc requestBlocksByRoot(rman: RequestManager, items: seq[Eth2Digest]) {.async: (raises: [CancelledError]).} =
var peer: Peer
try:
peer = await rman.network.peerPool.acquire()
@ -171,19 +171,13 @@ proc requestBlocksByRoot(rman: RequestManager, items: seq[Eth2Digest]) {.async.}
peer = peer, blocks = shortLog(items), err = blocks.error()
peer.updateScore(PeerScoreNoValues)
except CancelledError as exc:
raise exc
except CatchableError as exc:
peer.updateScore(PeerScoreNoValues)
debug "Error while fetching blocks by root", exc = exc.msg,
items = shortLog(items), peer = peer, peer_score = peer.getScore()
raise exc
finally:
if not(isNil(peer)):
rman.network.peerPool.release(peer)
proc fetchBlobsFromNetwork(self: RequestManager,
idList: seq[BlobIdentifier]) {.async.} =
idList: seq[BlobIdentifier])
{.async: (raises: [CancelledError]).} =
var peer: Peer
try:
@ -191,7 +185,7 @@ proc fetchBlobsFromNetwork(self: RequestManager,
debug "Requesting blobs by root", peer = peer, blobs = shortLog(idList),
peer_score = peer.getScore()
let blobs = (await blobSidecarsByRoot(peer, BlobIdentifierList idList))
let blobs = await blobSidecarsByRoot(peer, BlobIdentifierList idList)
if blobs.isOk:
let ublobs = blobs.get()
@ -219,18 +213,11 @@ proc fetchBlobsFromNetwork(self: RequestManager,
peer = peer, blobs = shortLog(idList), err = blobs.error()
peer.updateScore(PeerScoreNoValues)
except CancelledError as exc:
raise exc
except CatchableError as exc:
peer.updateScore(PeerScoreNoValues)
debug "Error while fetching blobs by root", exc = exc.msg,
idList = shortLog(idList), peer = peer, peer_score = peer.getScore()
raise exc
finally:
if not(isNil(peer)):
self.network.peerPool.release(peer)
proc requestManagerBlockLoop(rman: RequestManager) {.async.} =
proc requestManagerBlockLoop(rman: RequestManager) {.async: (raises: [CancelledError]).} =
while true:
# TODO This polling could be replaced with an AsyncEvent that is fired
# from the quarantine when there's work to do
@ -245,10 +232,9 @@ proc requestManagerBlockLoop(rman: RequestManager) {.async.} =
continue
debug "Requesting detected missing blocks", blocks = shortLog(blocks)
try:
let start = SyncMoment.now(0)
var workers: array[PARALLEL_REQUESTS, Future[void]]
var workers: array[PARALLEL_REQUESTS, Future[void].Raising([CancelledError])]
for i in 0 ..< PARALLEL_REQUESTS:
workers[i] = rman.requestBlocksByRoot(blocks)
@ -257,22 +243,9 @@ proc requestManagerBlockLoop(rman: RequestManager) {.async.} =
let finish = SyncMoment.now(uint64(len(blocks)))
var succeed = 0
for worker in workers:
if worker.completed():
inc(succeed)
debug "Request manager block tick", blocks = shortLog(blocks),
succeed = succeed,
failed = (len(workers) - succeed),
sync_speed = speed(start, finish)
except CancelledError:
break
except CatchableError as exc:
warn "Unexpected error in request manager block loop", exc = exc.msg
proc getMissingBlobs(rman: RequestManager): seq[BlobIdentifier] =
let
wallTime = rman.getBeaconTime()
@ -308,8 +281,7 @@ proc getMissingBlobs(rman: RequestManager): seq[BlobIdentifier] =
rman.quarantine[].removeBlobless(blobless)
fetches
proc requestManagerBlobLoop(rman: RequestManager) {.async.} =
proc requestManagerBlobLoop(rman: RequestManager) {.async: (raises: [CancelledError]).} =
while true:
# TODO This polling could be replaced with an AsyncEvent that is fired
# from the quarantine when there's work to do
@ -320,31 +292,18 @@ proc requestManagerBlobLoop(rman: RequestManager) {.async.} =
let fetches = rman.getMissingBlobs()
if fetches.len > 0:
debug "Requesting detected missing blobs", blobs = shortLog(fetches)
try:
let start = SyncMoment.now(0)
var workers: array[PARALLEL_REQUESTS, Future[void]]
var workers: array[PARALLEL_REQUESTS, Future[void].Raising([CancelledError])]
for i in 0 ..< PARALLEL_REQUESTS:
workers[i] = rman.fetchBlobsFromNetwork(fetches)
await allFutures(workers)
let finish = SyncMoment.now(uint64(len(fetches)))
var succeed = 0
for worker in workers:
if worker.finished() and not(worker.failed()):
inc(succeed)
debug "Request manager blob tick",
blobs_count = len(fetches),
succeed = succeed,
failed = (len(workers) - succeed),
sync_speed = speed(start, finish)
except CancelledError:
break
except CatchableError as exc:
warn "Unexpected error in request manager blob loop", exc = exc.msg
proc start*(rman: var RequestManager) =
## Start Request Manager's loops.
rman.blockLoopFuture = rman.requestManagerBlockLoop()

View File

@ -43,7 +43,7 @@ type
NoMonitor
SyncWorker*[A, B] = object
future: Future[void]
future: Future[void].Raising([CancelledError])
status: SyncWorkerStatus
SyncManager*[A, B] = ref object
@ -158,8 +158,9 @@ proc newSyncManager*[A, B](pool: PeerPool[A, B],
res.initQueue()
res
proc getBlocks*[A, B](man: SyncManager[A, B], peer: A,
req: SyncRequest): Future[BeaconBlocksRes] {.async.} =
proc getBlocks[A, B](man: SyncManager[A, B], peer: A,
req: SyncRequest): Future[BeaconBlocksRes] {.
async: (raises: [CancelledError], raw: true).} =
mixin getScore, `==`
logScope:
@ -171,21 +172,8 @@ proc getBlocks*[A, B](man: SyncManager[A, B], peer: A,
doAssert(not(req.isEmpty()), "Request must not be empty!")
debug "Requesting blocks from peer", request = req
try:
let res = await beaconBlocksByRange_v2(peer, req.slot, req.count, 1'u64)
if res.isErr():
debug "Error, while reading getBlocks response", request = req,
error = $res.error()
return
return res
except CancelledError:
debug "Interrupt, while waiting getBlocks response", request = req
return
except CatchableError as exc:
debug "Error, while waiting getBlocks response", request = req,
errName = exc.name, errMsg = exc.msg
return
beaconBlocksByRange_v2(peer, req.slot, req.count, 1'u64)
proc shouldGetBlobs[A, B](man: SyncManager[A, B], e: Epoch): bool =
let wallEpoch = man.getLocalWallSlot().epoch
@ -194,8 +182,8 @@ proc shouldGetBlobs[A, B](man: SyncManager[A, B], e: Epoch): bool =
e >= wallEpoch - man.MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS)
proc getBlobSidecars[A, B](man: SyncManager[A, B], peer: A,
req: SyncRequest
): Future[BlobSidecarsRes] {.async.} =
req: SyncRequest): Future[BlobSidecarsRes]
{.async: (raises: [CancelledError], raw: true).} =
mixin getScore, `==`
logScope:
@ -207,21 +195,7 @@ proc getBlobSidecars[A, B](man: SyncManager[A, B], peer: A,
doAssert(not(req.isEmpty()), "Request must not be empty!")
debug "Requesting blobs sidecars from peer", request = req
try:
let res = await blobSidecarsByRange(peer, req.slot, req.count)
if res.isErr():
debug "Error, while reading blobSidecarsByRange response", request = req,
error = $res.error()
return
return res
except CancelledError:
debug "Interrupt, while waiting blobSidecarsByRange response", request = req
return
except CatchableError as exc:
debug "Error, while waiting blobSidecarsByRange response", request = req,
errName = exc.name, errMsg = exc.msg
return
blobSidecarsByRange(peer, req.slot, req.count)
proc remainingSlots(man: SyncManager): uint64 =
let
@ -282,7 +256,8 @@ func checkBlobs(blobs: seq[BlobSidecars]): Result[void, string] =
? blob_sidecar[].verify_blob_sidecar_inclusion_proof()
ok()
proc syncStep[A, B](man: SyncManager[A, B], index: int, peer: A) {.async.} =
proc syncStep[A, B](man: SyncManager[A, B], index: int, peer: A)
{.async: (raises: [CancelledError]).} =
logScope:
peer_score = peer.getScore()
peer_speed = peer.netKbps()
@ -322,18 +297,12 @@ proc syncStep[A, B](man: SyncManager[A, B], index: int, peer: A) {.async.} =
trace "Updating peer's status information", wall_clock_slot = wallSlot,
remote_head_slot = peerSlot, local_head_slot = headSlot
try:
let res = await peer.updateStatus()
if not(res):
if not await peer.updateStatus():
peer.updateScore(PeerScoreNoStatus)
debug "Failed to get remote peer's status, exiting",
peer_head_slot = peerSlot
return
except CatchableError as exc:
debug "Unexpected exception while updating peer's status",
peer_head_slot = peerSlot, errName = exc.name, errMsg = exc.msg
return
let newPeerSlot = peer.getHeadSlot()
if peerSlot >= newPeerSlot:
@ -419,24 +388,22 @@ proc syncStep[A, B](man: SyncManager[A, B], index: int, peer: A) {.async.} =
man.workers[index].status = SyncWorkerStatus.Downloading
try:
let blocks = await man.getBlocks(peer, req)
if blocks.isErr():
let blocks = (await man.getBlocks(peer, req)).valueOr:
peer.updateScore(PeerScoreNoValues)
man.queue.push(req)
debug "Failed to receive blocks on request", request = req
return
let blockData = blocks.get().asSeq()
let blockSmap = getShortMap(req, blockData)
debug "Received blocks on request", blocks_count = len(blockData),
let blockSmap = getShortMap(req, blocks.asSeq())
debug "Received blocks on request", blocks_count = len(blocks),
blocks_map = blockSmap, request = req
let slots = mapIt(blockData, it[].slot)
let slots = mapIt(blocks, it[].slot)
if not(checkResponse(req, slots)):
peer.updateScore(PeerScoreBadResponse)
man.queue.push(req)
warn "Received blocks sequence is not in requested range",
blocks_count = len(blockData), blocks_map = blockSmap,
blocks_count = len(blocks), blocks_map = blockSmap,
request = req
return
@ -448,48 +415,43 @@ proc syncStep[A, B](man: SyncManager[A, B], index: int, peer: A) {.async.} =
let blobData =
if man.shouldGetBlobs(req.slot.epoch):
let blobs = await man.getBlobSidecars(peer, req)
if blobs.isErr():
let blobs = (await man.getBlobSidecars(peer, req)).valueOr:
peer.updateScore(PeerScoreNoValues)
man.queue.push(req)
debug "Failed to receive blobs on request", request = req
return
let blobData = blobs.get().asSeq()
let blobSmap = getShortMap(req, blobData)
debug "Received blobs on request", blobs_count = len(blobData),
let blobSmap = getShortMap(req, blobs.asSeq())
debug "Received blobs on request", blobs_count = len(blobs),
blobs_map = blobSmap, request = req
if len(blobData) > 0:
let slots = mapIt(blobData, it[].signed_block_header.message.slot)
if len(blobs) > 0:
let slots = mapIt(blobs, it[].signed_block_header.message.slot)
let uniqueSlots = foldl(slots, combine(a, b), @[slots[0]])
if not(checkResponse(req, uniqueSlots)):
peer.updateScore(PeerScoreBadResponse)
man.queue.push(req)
warn "Received blobs sequence is not in requested range",
blobs_count = len(blobData), blobs_map = getShortMap(req, blobData),
blobs_count = len(blobs), blobs_map = blobSmap,
request = req
return
let groupedBlobs = groupBlobs(req, blockData, blobData)
let groupedBlobs = groupBlobs(req, blocks.asSeq(), blobs.asSeq())
if groupedBlobs.isErr():
peer.updateScore(PeerScoreNoValues)
man.queue.push(req)
info "Received blobs sequence is inconsistent",
blobs_map = getShortMap(req, blobData), request = req, msg=groupedBlobs.error()
blobs_map = blobSmap, request = req, msg=groupedBlobs.error()
return
if (let checkRes = groupedBlobs.get.checkBlobs(); checkRes.isErr):
peer.updateScore(PeerScoreBadResponse)
man.queue.push(req)
warn "Received blobs sequence is invalid",
blobs_count = len(blobData),
blobs_map = getShortMap(req, blobData),
request = req,
msg = checkRes.error
blobs_map = blobSmap, request = req, msg=groupedBlobs.error()
return
Opt.some(groupedBlobs.get())
else:
Opt.none(seq[BlobSidecars])
if len(blockData) == 0 and man.direction == SyncQueueKind.Backward and
if len(blocks) == 0 and man.direction == SyncQueueKind.Backward and
req.contains(man.getSafeSlot()):
# The sync protocol does not distinguish between:
# - All requested slots are empty
@ -513,16 +475,10 @@ proc syncStep[A, B](man: SyncManager[A, B], index: int, peer: A) {.async.} =
# TODO descore peers that lie
maybeFinalized = lastSlot < peerFinalized
await man.queue.push(req, blockData, blobData, maybeFinalized, proc() =
await man.queue.push(req, blocks.asSeq(), blobData, maybeFinalized, proc() =
man.workers[index].status = SyncWorkerStatus.Processing)
except CatchableError as exc:
man.queue.push(req)
debug "Unexpected exception while receiving blocks", request = req,
errName = exc.name, errMsg = exc.msg
return
proc syncWorker[A, B](man: SyncManager[A, B], index: int) {.async.} =
proc syncWorker[A, B](man: SyncManager[A, B], index: int) {.async: (raises: [CancelledError]).} =
mixin getKey, getScore, getHeadSlot
logScope:
@ -533,10 +489,10 @@ proc syncWorker[A, B](man: SyncManager[A, B], index: int) {.async.} =
debug "Starting syncing worker"
while true:
var peer: A = nil
let doBreak =
try:
while true:
man.workers[index].status = SyncWorkerStatus.Sleeping
# This event is going to be set until we are not in sync with network
await man.notInSyncEvent.wait()
@ -544,19 +500,10 @@ proc syncWorker[A, B](man: SyncManager[A, B], index: int) {.async.} =
peer = await man.pool.acquire()
await man.syncStep(index, peer)
man.pool.release(peer)
false
except CancelledError:
peer = nil
finally:
if not(isNil(peer)):
man.pool.release(peer)
true
except CatchableError as exc:
debug "Unexpected exception in sync worker",
peer = peer, peer_score = peer.getScore(),
peer_speed = peer.netKbps(),
errName = exc.name, errMsg = exc.msg
true
if doBreak:
break
debug "Sync worker stopped"
@ -593,34 +540,10 @@ proc getWorkersStats[A, B](man: SyncManager[A, B]): tuple[map: string,
map[i] = ch
(map, sleeping, waiting, pending)
proc guardTask[A, B](man: SyncManager[A, B]) {.async.} =
logScope:
index = index
sync_ident = man.ident
direction = man.direction
topics = "syncman"
var pending: array[SyncWorkersCount, Future[void]]
proc startWorkers[A, B](man: SyncManager[A, B]) =
# Starting all the synchronization workers.
for i in 0 ..< len(man.workers):
let future = syncWorker[A, B](man, i)
man.workers[i].future = future
pending[i] = future
# Wait for synchronization worker's failure and replace it with new one.
while true:
let failFuture = await one(pending)
let index = pending.find(failFuture)
if failFuture.failed():
warn "Synchronization worker stopped working unexpectedly with an error",
errName = failFuture.error.name, errMsg = failFuture.error.msg
else:
warn "Synchronization worker stopped working unexpectedly without error"
let future = syncWorker[A, B](man, index)
man.workers[index].future = future
pending[index] = future
man.workers[i].future = syncWorker[A, B](man, i)
proc toTimeLeftString*(d: Duration): string =
if d == InfiniteDuration:
@ -648,11 +571,9 @@ proc toTimeLeftString*(d: Duration): string =
res = res & "00m"
res
proc syncClose[A, B](man: SyncManager[A, B], guardTaskFut: Future[void],
proc syncClose[A, B](man: SyncManager[A, B],
speedTaskFut: Future[void]) {.async.} =
var pending: seq[FutureBase]
if not(guardTaskFut.finished()):
pending.add(guardTaskFut.cancelAndWait())
if not(speedTaskFut.finished()):
pending.add(speedTaskFut.cancelAndWait())
for worker in man.workers:
@ -669,11 +590,11 @@ proc syncLoop[A, B](man: SyncManager[A, B]) {.async.} =
mixin getKey, getScore
var pauseTime = 0
var guardTaskFut = man.guardTask()
man.startWorkers()
debug "Synchronization loop started"
proc averageSpeedTask() {.async.} =
proc averageSpeedTask() {.async: (raises: [CancelledError]).} =
while true:
# Reset sync speeds between each loss-of-sync event
man.avgSyncSpeed = 0
@ -703,7 +624,7 @@ proc syncLoop[A, B](man: SyncManager[A, B]) {.async.} =
stamp = newStamp
var averageSpeedTaskFut = averageSpeedTask()
let averageSpeedTaskFut = averageSpeedTask()
while true:
let wallSlot = man.getLocalWallSlot()
@ -788,7 +709,7 @@ proc syncLoop[A, B](man: SyncManager[A, B]) {.async.} =
of SyncQueueKind.Forward:
if man.inProgress:
if SyncManagerFlag.NoMonitor in man.flags:
await man.syncClose(guardTaskFut, averageSpeedTaskFut)
await man.syncClose(averageSpeedTaskFut)
man.inProgress = false
debug "Forward synchronization process finished, exiting",
wall_head_slot = wallSlot, local_head_slot = headSlot,
@ -809,10 +730,8 @@ proc syncLoop[A, B](man: SyncManager[A, B]) {.async.} =
of SyncQueueKind.Backward:
# Backward syncing is going to be executed only once, so we exit loop
# and stop all pending tasks which belongs to this instance (sync
# workers, guard task and speed calculation task).
# We first need to cancel and wait for guard task, because otherwise
# it will be able to restore cancelled workers.
await man.syncClose(guardTaskFut, averageSpeedTaskFut)
# workers, speed calculation task).
await man.syncClose(averageSpeedTaskFut)
man.inProgress = false
debug "Backward synchronization process finished, exiting",
wall_head_slot = wallSlot, local_head_slot = headSlot,

View File

@ -35,17 +35,14 @@ type
BlockRootsList* = List[Eth2Digest, Limit MAX_REQUEST_BLOCKS]
BlobIdentifierList* = List[BlobIdentifier, Limit (MAX_REQUEST_BLOB_SIDECARS)]
template readChunkPayload*(
conn: Connection, peer: Peer, MsgType: type ForkySignedBeaconBlock):
Future[NetRes[MsgType]] =
readChunkPayload(conn, peer, MsgType)
proc readChunkPayload*(
conn: Connection, peer: Peer, MsgType: type (ref ForkedSignedBeaconBlock)):
Future[NetRes[MsgType]] {.async.} =
Future[NetRes[MsgType]] {.async: (raises: [CancelledError]).} =
var contextBytes: ForkDigest
try:
await conn.readExactly(addr contextBytes, sizeof contextBytes)
except CancelledError as exc:
raise exc
except CatchableError:
return neterr UnexpectedEOF
@ -84,10 +81,12 @@ proc readChunkPayload*(
proc readChunkPayload*(
conn: Connection, peer: Peer, MsgType: type (ref BlobSidecar)):
Future[NetRes[MsgType]] {.async.} =
Future[NetRes[MsgType]] {.async: (raises: [CancelledError]).} =
var contextBytes: ForkDigest
try:
await conn.readExactly(addr contextBytes, sizeof contextBytes)
except CancelledError as exc:
raise exc
except CatchableError:
return neterr UnexpectedEOF

View File

@ -27,7 +27,7 @@ type
ProcessingCallback* = proc() {.gcsafe, raises: [].}
BlockVerifier* = proc(signedBlock: ForkedSignedBeaconBlock,
blobs: Opt[BlobSidecars], maybeFinalized: bool):
Future[Result[void, VerifierError]] {.gcsafe, raises: [].}
Future[Result[void, VerifierError]] {.async: (raises: [CancelledError]).}
SyncQueueKind* {.pure.} = enum
Forward, Backward
@ -50,7 +50,7 @@ type
item*: T
SyncWaiter* = ref object
future: Future[void]
future: Future[void].Raising([CancelledError])
reset: bool
RewindPoint = object
@ -311,9 +311,9 @@ proc wakeupWaiters[T](sq: SyncQueue[T], reset = false) =
if not(item.future.finished()):
item.future.complete()
proc waitForChanges[T](sq: SyncQueue[T]): Future[bool] {.async.} =
proc waitForChanges[T](sq: SyncQueue[T]): Future[bool] {.async: (raises: [CancelledError]).} =
## Create new waiter and wait for completion from `wakeupWaiters()`.
var waitfut = newFuture[void]("SyncQueue.waitForChanges")
let waitfut = Future[void].Raising([CancelledError]).init("SyncQueue.waitForChanges")
let waititem = SyncWaiter(future: waitfut)
sq.waiters.add(waititem)
try:
@ -322,7 +322,7 @@ proc waitForChanges[T](sq: SyncQueue[T]): Future[bool] {.async.} =
finally:
sq.waiters.delete(sq.waiters.find(waititem))
proc wakeupAndWaitWaiters[T](sq: SyncQueue[T]) {.async.} =
proc wakeupAndWaitWaiters[T](sq: SyncQueue[T]) {.async: (raises: [CancelledError]).} =
## This procedure will perform wakeupWaiters(true) and blocks until last
## waiter will be awakened.
var waitChanges = sq.waitForChanges()
@ -333,7 +333,7 @@ proc clearAndWakeup*[T](sq: SyncQueue[T]) =
sq.pending.clear()
sq.wakeupWaiters(true)
proc resetWait*[T](sq: SyncQueue[T], toSlot: Option[Slot]) {.async.} =
proc resetWait*[T](sq: SyncQueue[T], toSlot: Option[Slot]) {.async: (raises: [CancelledError]).} =
## Perform reset of all the blocked waiters in SyncQueue.
##
## We adding one more waiter to the waiters sequence and
@ -610,7 +610,7 @@ proc push*[T](sq: SyncQueue[T], sr: SyncRequest[T],
data: seq[ref ForkedSignedBeaconBlock],
blobs: Opt[seq[BlobSidecars]],
maybeFinalized: bool = false,
processingCb: ProcessingCallback = nil) {.async.} =
processingCb: ProcessingCallback = nil) {.async: (raises: [CancelledError]).} =
logScope:
sync_ident = sq.ident
topics = "syncman"

View File

@ -254,5 +254,5 @@ build/block_sim --slots=384 --validators=20000 --attesterRatio=0.66
## Sync from a specific peer
```sh
build/nimbus_beacon_node --discv5:off --tcp-port=9876 --direct-peer="/ip4/127.0.0.1/tcp/9000/p2p/$(curl -s -X 'GET' 'http://localhost:5052/eth/v1/node/identity' -H 'accept: application/json' | jq -r .data.peer_id)"
build/nimbus_beacon_node --no-el --discv5:off --tcp-port=9876 --direct-peer="/ip4/127.0.0.1/tcp/9000/p2p/$(curl -s -X 'GET' 'http://localhost:5052/eth/v1/node/identity' -H 'accept: application/json' | jq -r .data.peer_id)"
```

View File

@ -50,8 +50,8 @@ proc collector(queue: AsyncQueue[BlockEntry]): BlockVerifier =
# the BlockProcessor and this test
proc verify(signedBlock: ForkedSignedBeaconBlock, blobs: Opt[BlobSidecars],
maybeFinalized: bool):
Future[Result[void, VerifierError]] =
let fut = newFuture[Result[void, VerifierError]]()
Future[Result[void, VerifierError]] {.async: (raises: [CancelledError], raw: true).} =
let fut = Future[Result[void, VerifierError]].Raising([CancelledError]).init()
try: queue.addLastNoWait(BlockEntry(blck: signedBlock, resfut: fut))
except CatchableError as exc: raiseAssert exc.msg
return fut