diff --git a/beacon_chain/gossip_processing/block_processor.nim b/beacon_chain/gossip_processing/block_processor.nim index d63086987..0f936d42f 100644 --- a/beacon_chain/gossip_processing/block_processor.nim +++ b/beacon_chain/gossip_processing/block_processor.nim @@ -61,7 +61,7 @@ type blobs*: Opt[BlobSidecars] maybeFinalized*: bool ## The block source claims the block has been finalized already - resfut*: Future[Result[void, VerifierError]] + resfut*: Future[Result[void, VerifierError]].Raising([CancelledError]) queueTick*: Moment # Moment when block was enqueued validationDur*: Duration # Time it took to perform gossip validation src*: MsgSource @@ -385,7 +385,7 @@ proc checkBloblessSignature(self: BlockProcessor, proc enqueueBlock*( self: var BlockProcessor, src: MsgSource, blck: ForkedSignedBeaconBlock, blobs: Opt[BlobSidecars], - resfut: Future[Result[void, VerifierError]] = nil, + resfut: Future[Result[void, VerifierError]].Raising([CancelledError]) = nil, maybeFinalized = false, validationDur = Duration()) = withBlck(blck): @@ -756,7 +756,7 @@ proc storeBlock( proc addBlock*( self: var BlockProcessor, src: MsgSource, blck: ForkedSignedBeaconBlock, blobs: Opt[BlobSidecars], maybeFinalized = false, - validationDur = Duration()): Future[Result[void, VerifierError]] = + validationDur = Duration()): Future[Result[void, VerifierError]] {.async: (raises: [CancelledError], raw: true).} = ## Enqueue a Gossip-validated block for consensus verification # Backpressure: # There is no backpressure here - producers must wait for `resfut` to diff --git a/beacon_chain/networking/eth2_network.nim b/beacon_chain/networking/eth2_network.nim index b5744f766..60fb67189 100644 --- a/beacon_chain/networking/eth2_network.nim +++ b/beacon_chain/networking/eth2_network.nim @@ -74,7 +74,7 @@ type seenThreshold*: chronos.Duration connQueue: AsyncQueue[PeerAddr] seenTable: Table[PeerId, SeenItem] - connWorkers: seq[Future[void]] + connWorkers: seq[Future[void].Raising([CancelledError])] connTable: HashSet[PeerId] forkId*: ENRForkID discoveryForkId*: ENRForkID @@ -83,8 +83,8 @@ type peers*: Table[PeerId, Peer] directPeers*: DirectPeers validTopics: HashSet[string] - peerPingerHeartbeatFut: Future[void] - peerTrimmerHeartbeatFut: Future[void] + peerPingerHeartbeatFut: Future[void].Raising([CancelledError]) + peerTrimmerHeartbeatFut: Future[void].Raising([CancelledError]) cfg: RuntimeConfig getBeaconTime: GetBeaconTimeFn @@ -167,8 +167,8 @@ type PeerStateInitializer* = proc(peer: Peer): RootRef {.gcsafe, raises: [].} NetworkStateInitializer* = proc(network: Eth2Node): RootRef {.gcsafe, raises: [].} - OnPeerConnectedHandler* = proc(peer: Peer, incoming: bool): Future[void] {.gcsafe, raises: [].} - OnPeerDisconnectedHandler* = proc(peer: Peer): Future[void] {.gcsafe, raises: [].} + OnPeerConnectedHandler* = proc(peer: Peer, incoming: bool): Future[void] {.async: (raises: [CancelledError]).} + OnPeerDisconnectedHandler* = proc(peer: Peer): Future[void] {.async: (raises: [CancelledError]).} ThunkProc* = LPProtoHandler MounterProc* = proc(network: Eth2Node) {.gcsafe, raises: [].} MessageContentPrinter* = proc(msg: pointer): string {.gcsafe, raises: [].} @@ -183,9 +183,6 @@ type # erroneous request-specific responses. PeerScoreLow = 237 # 79 * 3 - PeerDisconnected* = object of CatchableError - reason*: DisconnectionReason - TransmissionError* = object of CatchableError Eth2NetworkingErrorKind* = enum @@ -207,6 +204,8 @@ type InvalidContextBytes ResponseChunkOverflow + UnknownError + Eth2NetworkingError = object case kind*: Eth2NetworkingErrorKind of ReceivedErrorResponse: @@ -340,14 +339,22 @@ func shortProtocolId(protocolId: string): string = proc openStream(node: Eth2Node, peer: Peer, - protocolId: string): Future[Connection] {.async.} = + protocolId: string): Future[NetRes[Connection]] + {.async: (raises: [CancelledError]).} = # When dialing here, we do not provide addresses - all new connection # attempts are handled via `connect` which also takes into account # reconnection timeouts - let - conn = await dial(node.switch, peer.peerId, protocolId) - - return conn + try: + ok await dial(node.switch, peer.peerId, protocolId) + except LPError as exc: + debug "Dialling failed", exc = exc.msg + neterr BrokenConnection + except CancelledError as exc: + raise exc + except CatchableError as exc: + # TODO remove once libp2p supports `raises` + warn "Unknown error when opening stream", exc = exc.msg + neterr UnknownError proc init(T: type Peer, network: Eth2Node, peerId: PeerId): Peer {.gcsafe.} @@ -523,8 +530,11 @@ proc addSeen(network: Eth2Node, peerId: PeerId, network.seenTable[peerId] = item proc disconnect*(peer: Peer, reason: DisconnectionReason, - notifyOtherPeer = false) {.async.} = - # TODO(zah): How should we notify the other peer? + notifyOtherPeer = false) {.async: (raises: [CancelledError]).} = + # Per the specification, we MAY send a disconnect reason to the other peer but + # we currently don't - the fact that we're disconnecting is obvious and the + # reason already known (wrong network is known from status message) or doesn't + # greatly matter for the listening side (since it can't be trusted anyway) try: if peer.connectionState notin {Disconnecting, Disconnected}: peer.connectionState = Disconnecting @@ -540,12 +550,16 @@ proc disconnect*(peer: Peer, reason: DisconnectionReason, SeenTablePenaltyError peer.network.addSeen(peer.peerId, seenTime) await peer.network.switch.disconnect(peer.peerId) - except CatchableError: - # We do not care about exceptions in disconnection procedure. - trace "Exception while disconnecting peer", peer = peer.peerId, - reason = reason + except CancelledError as exc: + raise exc + except CatchableError as exc: + # switch.disconnect shouldn't raise + warn "Unexpected error while disconnecting peer", + peer = peer.peerId, + reason = reason, + exc = exc.msg -proc releasePeer*(peer: Peer) = +proc releasePeer(peer: Peer) = ## Checks for peer's score and disconnects peer if score is less than ## `PeerScoreLowLimit`. if peer.connectionState notin {ConnectionState.Disconnecting, @@ -641,22 +655,33 @@ proc sendErrorResponse(peer: Peer, peer, responseCode, errMsg = formatErrorMsg(errMsg) conn.writeChunk(Opt.some responseCode, SSZ.encode(errMsg)) -proc sendNotificationMsg(peer: Peer, protocolId: string, requestBytes: seq[byte]) {.async.} = - var +proc sendNotificationMsg(peer: Peer, protocolId: string, requestBytes: seq[byte]) + {.async: (raises: [CancelledError]).} = + # Notifications are sent as a best effort, ie errors are not reported back + # to the caller + let deadline = sleepAsync RESP_TIMEOUT_DUR - streamFut = peer.network.openStream(peer, protocolId) + streamRes = awaitWithTimeout(peer.network.openStream(peer, protocolId), deadline): + debug "Timeout while opening stream for notification", peer, protocolId + return - await streamFut or deadline + let stream = streamRes.valueOr: + debug "Could not open stream for notification", + peer, protocolId, error = streamRes.error + return - if not streamFut.finished: - await streamFut.cancelAndWait() - raise newException(TransmissionError, "Failed to open LibP2P stream") - - let stream = streamFut.read try: await stream.writeChunk(Opt.none ResponseCode, requestBytes) + except CancelledError as exc: + raise exc + except CatchableError as exc: + debug "Error while writing notification", peer, protocolId, exc = exc.msg finally: - await stream.close() + try: + await noCancel stream.close() + except CatchableError as exc: + warn "Unexpected error while closing notification stream", + peer, protocolId, exc = exc.msg proc sendResponseChunkBytesSZ( response: UntypedResponse, uncompressedLen: uint64, @@ -687,13 +712,17 @@ template sendUserHandlerResultAsChunkImpl*(stream: Connection, writeChunk(stream, Opt.some ResponseCode.Success, SSZ.encode(handlerResult)) proc uncompressFramedStream(conn: Connection, - expectedSize: int): Future[Result[seq[byte], cstring]] - {.async.} = + expectedSize: int): Future[Result[seq[byte], string]] + {.async: (raises: [CancelledError]).} = var header: array[framingHeader.len, byte] try: await conn.readExactly(addr header[0], header.len) except LPStreamEOFError, LPStreamIncompleteError: return err "Unexpected EOF before snappy header" + except CancelledError as exc: + raise exc + except CatchableError as exc: + return err "Unexpected error reading header: " & exc.msg if header != framingHeader: return err "Incorrect snappy header" @@ -712,6 +741,10 @@ proc uncompressFramedStream(conn: Connection, await conn.readExactly(addr frameHeader[0], frameHeader.len) except LPStreamEOFError, LPStreamIncompleteError: return err "Snappy frame header missing" + except CancelledError as exc: + raise exc + except CatchableError as exc: + return err "Unexpected error reading frame header: " & exc.msg let (id, dataLen) = decodeFrameHeader(frameHeader) @@ -726,6 +759,10 @@ proc uncompressFramedStream(conn: Connection, await conn.readExactly(addr frameData[0], dataLen) except LPStreamEOFError, LPStreamIncompleteError: return err "Incomplete snappy frame" + except CancelledError as exc: + raise exc + except CatchableError as exc: + return err "Unexpected error reading frame data: " & exc.msg if id == chunkCompressed: if dataLen < 6: # At least CRC + 2 bytes of frame data @@ -809,7 +846,8 @@ template gossipMaxSize(T: untyped): uint32 = static: doAssert maxSize <= GOSSIP_MAX_SIZE maxSize.uint32 -proc readVarint2(conn: Connection): Future[NetRes[uint64]] {.async.} = +proc readVarint2(conn: Connection): Future[NetRes[uint64]] {. + async: (raises: [CancelledError]).} = try: ok await conn.readVarint() except LPStreamEOFError: #, LPStreamIncompleteError, InvalidVarintError @@ -820,9 +858,15 @@ proc readVarint2(conn: Connection): Future[NetRes[uint64]] {.async.} = neterr UnexpectedEOF except InvalidVarintError: neterr InvalidSizePrefix + except CancelledError as exc: + raise exc + except CatchableError as exc: + warn "Unexpected error", exc = exc.msg + neterr UnknownError proc readChunkPayload*(conn: Connection, peer: Peer, - MsgType: type): Future[NetRes[MsgType]] {.async.} = + MsgType: type): Future[NetRes[MsgType]] + {.async: (raises: [CancelledError]).} = let sm = now(chronos.Moment) size = ? await readVarint2(conn) @@ -844,46 +888,49 @@ proc readChunkPayload*(conn: Connection, peer: Peer, # not be significant. peer.updateNetThroughput(now(chronos.Moment) - sm, uint64(10 + size)) - ok SSZ.decode(data, MsgType) + try: + ok SSZ.decode(data, MsgType) + except SerializationError: + neterr InvalidSszBytes proc readResponseChunk( conn: Connection, peer: Peer, MsgType: typedesc): - Future[NetRes[MsgType]] {.async.} = + Future[NetRes[MsgType]] {.async: (raises: [CancelledError]).} = mixin readChunkPayload + var responseCodeByte: byte try: - var responseCodeByte: byte - try: - await conn.readExactly(addr responseCodeByte, 1) - except LPStreamEOFError, LPStreamIncompleteError: - return neterr PotentiallyExpectedEOF - - static: assert ResponseCode.low.ord == 0 - if responseCodeByte > ResponseCode.high.byte: - return neterr InvalidResponseCode - - let responseCode = ResponseCode responseCodeByte - case responseCode: - of InvalidRequest, ServerError, ResourceUnavailable: - let - errorMsgChunk = await readChunkPayload(conn, peer, ErrorMsg) - errorMsg = if errorMsgChunk.isOk: errorMsgChunk.value - else: return err(errorMsgChunk.error) - errorMsgStr = toPrettyString(errorMsg.asSeq) - debug "Error response from peer", responseCode, errMsg = errorMsgStr - return err Eth2NetworkingError(kind: ReceivedErrorResponse, - responseCode: responseCode, - errorMsg: errorMsgStr) - of Success: - discard - - return await readChunkPayload(conn, peer, MsgType) - + await conn.readExactly(addr responseCodeByte, 1) except LPStreamEOFError, LPStreamIncompleteError: - return neterr UnexpectedEOF + return neterr PotentiallyExpectedEOF + except CancelledError as exc: + raise exc + except CatchableError as exc: + warn "Unexpected error", exc = exc.msg + return neterr UnknownError + + static: assert ResponseCode.low.ord == 0 + if responseCodeByte > ResponseCode.high.byte: + return neterr InvalidResponseCode + + let responseCode = ResponseCode responseCodeByte + case responseCode: + of InvalidRequest, ServerError, ResourceUnavailable: + let + errorMsg = ? await readChunkPayload(conn, peer, ErrorMsg) + errorMsgStr = toPrettyString(errorMsg.asSeq) + debug "Error response from peer", responseCode, errMsg = errorMsgStr + return err Eth2NetworkingError(kind: ReceivedErrorResponse, + responseCode: responseCode, + errorMsg: errorMsgStr) + of Success: + discard + + return await readChunkPayload(conn, peer, MsgType) proc readResponse(conn: Connection, peer: Peer, - MsgType: type, timeout: Duration): Future[NetRes[MsgType]] {.async.} = + MsgType: type, timeout: Duration): Future[NetRes[MsgType]] + {.async: (raises: [CancelledError]).} = when MsgType is List: type E = MsgType.T var results: MsgType @@ -897,7 +944,7 @@ proc readResponse(conn: Connection, peer: Peer, let nextFut = conn.readResponseChunk(peer, E) if not await nextFut.withTimeout(timeout): return neterr(ReadResponseTimeout) - let nextRes = nextFut.read() + let nextRes = await nextFut if nextRes.isErr: if nextRes.error.kind == PotentiallyExpectedEOF: trace "EOF chunk", conn, err = nextRes.error @@ -914,15 +961,19 @@ proc readResponse(conn: Connection, peer: Peer, let nextFut = conn.readResponseChunk(peer, MsgType) if not await nextFut.withTimeout(timeout): return neterr(ReadResponseTimeout) - return nextFut.read() + return await nextFut # Guaranteed to complete without waiting proc makeEth2Request(peer: Peer, protocolId: string, requestBytes: seq[byte], ResponseMsg: type, timeout: Duration): Future[NetRes[ResponseMsg]] - {.async.} = - let deadline = sleepAsync timeout - let stream = awaitWithTimeout(peer.network.openStream(peer, protocolId), - deadline): return neterr StreamOpenTimeout + {.async: (raises: [CancelledError]).} = + let + deadline = sleepAsync timeout + streamRes = + awaitWithTimeout(peer.network.openStream(peer, protocolId), deadline): + return neterr StreamOpenTimeout + stream = ?streamRes + try: # Send the request # Some clients don't want a length sent for empty requests @@ -942,13 +993,18 @@ proc makeEth2Request(peer: Peer, protocolId: string, requestBytes: seq[byte], peer.updateScore(PeerScoreInvalidRequest) else: peer.updateScore(PeerScorePoorRequest) - return res - except SerializationError as exc: - # Yay for both exceptions and results! - peer.updateScore(PeerScoreInvalidRequest) + res + except CancelledError as exc: raise exc + except CatchableError: + peer.updateScore(PeerScorePoorRequest) + neterr BrokenConnection finally: - await stream.closeWithEOF() + try: + await noCancel stream.closeWithEOF() + except CatchableError as exc: + warn "Unexpected error while closing stream", + peer, protocolId, exc = exc.msg proc init*(T: type MultipleChunksResponse, peer: Peer, conn: Connection): T = T(UntypedResponse(peer: peer, stream: conn)) @@ -988,7 +1044,7 @@ template sendSSZ*[M]( doAssert UntypedResponse(r).writtenChunks == 0 sendResponseChunk(UntypedResponse(r), val, contextBytes) -proc performProtocolHandshakes(peer: Peer, incoming: bool) {.async.} = +proc performProtocolHandshakes(peer: Peer, incoming: bool) {.async: (raises: [CancelledError]).} = # Loop down serially because it's easier to reason about the connection state # when there are fewer async races, specially during setup for protocol in peer.network.protocols: @@ -1036,7 +1092,7 @@ proc implementSendProcBody(sendProc: SendProc) = proc handleIncomingStream(network: Eth2Node, conn: Connection, protocolId: string, - MsgType: type) {.async.} = + MsgType: type) {.async: (raises: [CancelledError]).} = mixin callUserHandler, RecType type MsgRec = RecType(MsgType) @@ -1057,7 +1113,6 @@ proc handleIncomingStream(network: Eth2Node, # We got incoming stream request while disconnected or disconnecting. debug "Got incoming request from disconnected peer", peer = peer, message = msgName - await conn.closeWithEOF() return of Connecting: # We got incoming stream request while handshake is not yet finished, @@ -1114,10 +1169,6 @@ proc handleIncomingStream(network: Eth2Node, errorMsgLit "Request full data not sent in time") return - except SerializationError as err: - nbc_reqresp_messages_failed.inc(1, [shortProtocolId(protocolId)]) - returnInvalidRequest err.formatMsg("msg") - finally: # The request quota is shared between all requests - it represents the # cost to perform a service on behalf of a client and is incurred @@ -1182,27 +1233,33 @@ proc handleIncomingStream(network: Eth2Node, of ResponseChunkOverflow: (InvalidRequest, errorMsgLit "Too many chunks in response") + of UnknownError: + (InvalidRequest, errorMsgLit "Unknown error while processing request") + await sendErrorResponse(peer, conn, responseCode, errMsg) return try: # logReceivedMsg(peer, MsgType(msg.get)) await callUserHandler(MsgType, peer, conn, msg.get) - except InvalidInputsError as err: + except InvalidInputsError as exc: nbc_reqresp_messages_failed.inc(1, [shortProtocolId(protocolId)]) - returnInvalidRequest err.msg - except ResourceUnavailableError as err: - returnResourceUnavailable err.msg - except CatchableError as err: + returnInvalidRequest exc.msg + except ResourceUnavailableError as exc: + returnResourceUnavailable exc.msg + except CatchableError as exc: nbc_reqresp_messages_failed.inc(1, [shortProtocolId(protocolId)]) - await sendErrorResponse(peer, conn, ServerError, ErrorMsg err.msg.toBytes) + await sendErrorResponse(peer, conn, ServerError, ErrorMsg exc.msg.toBytes) - except CatchableError as err: + except CatchableError as exc: nbc_reqresp_messages_failed.inc(1, [shortProtocolId(protocolId)]) - debug "Error processing an incoming request", err = err.msg, msgName + debug "Error processing an incoming request", exc = exc.msg, msgName finally: - await conn.closeWithEOF() + try: + await noCancel conn.closeWithEOF() + except CatchableError as exc: + warn "Unexpected error while closing incoming connection", exc = exc.msg releasePeer(peer) proc toPeerAddr*(r: enr.TypedRecord, @@ -1264,7 +1321,7 @@ proc checkPeer(node: Eth2Node, peerAddr: PeerAddr): bool = else: true -proc dialPeer(node: Eth2Node, peerAddr: PeerAddr, index = 0) {.async.} = +proc dialPeer(node: Eth2Node, peerAddr: PeerAddr, index = 0) {.async: (raises: [CancelledError]).} = ## Establish connection with remote peer identified by address ``peerAddr``. logScope: peer = peerAddr.peerId @@ -1299,7 +1356,7 @@ proc dialPeer(node: Eth2Node, peerAddr: PeerAddr, index = 0) {.async.} = inc nbc_failed_dials node.addSeen(peerAddr.peerId, SeenTableTimeDeadPeer) -proc connectWorker(node: Eth2Node, index: int) {.async.} = +proc connectWorker(node: Eth2Node, index: int) {.async: (raises: [CancelledError]).} = debug "Connection worker started", index = index while true: # This loop will never produce HIGH CPU usage because it will wait @@ -1575,7 +1632,7 @@ proc resolvePeer(peer: Peer) = nbc_resolve_time.observe(delay.toFloatSeconds()) debug "Peer's ENR recovered", delay -proc handlePeer*(peer: Peer) {.async.} = +proc handlePeer*(peer: Peer) {.async: (raises: [CancelledError]).} = let res = peer.network.peerPool.addPeerNoWait(peer, peer.direction) case res: of PeerStatus.LowScoreError, PeerStatus.NoSpaceError: @@ -1607,7 +1664,9 @@ proc handlePeer*(peer: Peer) {.async.} = debug "Peer successfully connected", peer = peer, connections = peer.connections -proc onConnEvent(node: Eth2Node, peerId: PeerId, event: ConnEvent) {.async.} = +proc onConnEvent( + node: Eth2Node, peerId: PeerId, event: ConnEvent) {. + async: (raises: [CancelledError]).} = let peer = node.getPeer(peerId) case event.kind of ConnEventKind.Connected: @@ -1632,7 +1691,12 @@ proc onConnEvent(node: Eth2Node, peerId: PeerId, event: ConnEvent) {.async.} = # we might end up here debug "Got connection attempt from peer that we are disconnecting", peer = peerId - await node.switch.disconnect(peerId) + try: + await node.switch.disconnect(peerId) + except CancelledError as exc: + raise exc + except CatchableError as exc: + warn "Unexpected error while disconnecting peer", exc = exc.msg return of None: # We have established a connection with the new peer. @@ -1768,23 +1832,22 @@ proc startListening*(node: Eth2Node) {.async.} = if node.discoveryEnabled: try: node.discovery.open() - except CatchableError as err: + except CatchableError as exc: fatal "Failed to start discovery service. UDP port may be already in use", - err = err.msg + exc = exc.msg quit 1 try: await node.switch.start() - except CatchableError as err: + except CatchableError as exc: fatal "Failed to start LibP2P transport. TCP port may be already in use", - err = err.msg + exc = exc.msg quit 1 -proc peerPingerHeartbeat(node: Eth2Node): Future[void] {.gcsafe.} -proc peerTrimmerHeartbeat(node: Eth2Node): Future[void] {.gcsafe.} - -proc start*(node: Eth2Node) {.async.} = +proc peerPingerHeartbeat(node: Eth2Node): Future[void] {.async: (raises: [CancelledError]).} +proc peerTrimmerHeartbeat(node: Eth2Node): Future[void] {.async: (raises: [CancelledError]).} +proc start*(node: Eth2Node) {.async: (raises: [CancelledError]).} = proc onPeerCountChanged() = trace "Number of peers has been changed", length = len(node.peerPool) nbc_peers.set int64(len(node.peerPool)) @@ -1809,7 +1872,7 @@ proc start*(node: Eth2Node) {.async.} = node.peerPingerHeartbeatFut = node.peerPingerHeartbeat() node.peerTrimmerHeartbeatFut = node.peerTrimmerHeartbeat() -proc stop*(node: Eth2Node) {.async.} = +proc stop*(node: Eth2Node) {.async: (raises: [CancelledError]).} = # Ignore errors in futures, since we're shutting down (but log them on the # TRACE level, if a timeout is reached). var waitedFutures = @@ -1980,17 +2043,14 @@ proc p2pProtocolBackendImpl*(p: P2PProtocol): Backend = import ./peer_protocol export peer_protocol -proc updatePeerMetadata(node: Eth2Node, peerId: PeerId) {.async.} = +proc updatePeerMetadata(node: Eth2Node, peerId: PeerId) {.async: (raises: [CancelledError]).} = trace "updating peer metadata", peerId - var peer = node.getPeer(peerId) - - #getMetaData can fail with an exception - let newMetadata = - try: - tryGet(await peer.getMetadata_v2()) - except CatchableError as exc: - debug "Failed to retrieve metadata from peer!", peerId, msg=exc.msg + let + peer = node.getPeer(peerId) + newMetadataRes = await peer.getMetadata_v2() + newMetadata = newMetadataRes.valueOr: + debug "Failed to retrieve metadata from peer!", peerId, error = newMetadataRes.error peer.failedMetadataRequests.inc() return @@ -2003,7 +2063,7 @@ const MetadataRequestFrequency = 30.minutes MetadataRequestMaxFailures = 3 -proc peerPingerHeartbeat(node: Eth2Node) {.async.} = +proc peerPingerHeartbeat(node: Eth2Node) {.async: (raises: [CancelledError]).} = while true: let heartbeatStart_m = Moment.now() var updateFutures: seq[Future[void]] @@ -2012,7 +2072,7 @@ proc peerPingerHeartbeat(node: Eth2Node) {.async.} = if peer.connectionState != Connected: continue if peer.metadata.isNone or - heartbeatStart_m - peer.lastMetadataTime > MetadataRequestFrequency: + heartbeatStart_m - peer.lastMetadataTime > MetadataRequestFrequency: updateFutures.add(node.updatePeerMetadata(peer.peerId)) await allFutures(updateFutures) @@ -2022,24 +2082,21 @@ proc peerPingerHeartbeat(node: Eth2Node) {.async.} = if peer.failedMetadataRequests > MetadataRequestMaxFailures: debug "no metadata from peer, kicking it", peer - asyncSpawn peer.disconnect(PeerScoreLow) + await peer.disconnect(PeerScoreLow) await sleepAsync(5.seconds) -proc peerTrimmerHeartbeat(node: Eth2Node) {.async.} = +proc peerTrimmerHeartbeat(node: Eth2Node) {.async: (raises: [CancelledError]).} = + # Disconnect peers in excess of the (soft) max peer count while true: - # Peer trimmer + # Only count Connected peers (to avoid counting Disconnecting ones) + let + connectedPeers = node.peers.values.countIt( + it.connectionState == Connected) + excessPeers = connectedPeers - node.wantedPeers - # Only count Connected peers - # (to avoid counting Disconnecting ones) - var connectedPeers = 0 - for peer in node.peers.values: - if peer.connectionState == Connected: - inc connectedPeers - - let excessPeers = connectedPeers - node.wantedPeers if excessPeers > 0: - # Let chronos take back control every kick + # Let chronos take back control every trimming node.trimConnections(1) await sleepAsync(1.seconds div max(1, excessPeers)) @@ -2183,7 +2240,6 @@ proc createEth2Node*(rng: ref HmacDrbgContext, config.nat, config.listenAddress, config.tcpPort, config.udpPort, clientId) except CatchableError as exc: raise exc - except Exception as exc: raiseAssert exc.msg directPeers = block: var res: DirectPeers @@ -2389,19 +2445,26 @@ proc gossipEncode(msg: auto): seq[byte] = snappy.encode(uncompressed) proc broadcast(node: Eth2Node, topic: string, msg: seq[byte]): - Future[Result[void, cstring]] {.async.} = - let peers = await node.pubsub.publish(topic, msg) + Future[SendResult] {.async: (raises: [CancelledError]).} = + let peers = + try: + await node.pubsub.publish(topic, msg) + except CancelledError as exc: + raise exc + except CatchableError as exc: + warn "Unknown error during broadcast", exc = exc.msg + return err("Broadcast failed") # TODO remove workaround for sync committee BN/VC log spam if peers > 0 or find(topic, "sync_committee_") != -1: inc nbc_gossip_messages_sent - return ok() + ok() else: # Increments libp2p_gossipsub_failed_publish metric - return err("No peers on libp2p topic") + err("No peers on libp2p topic") proc broadcast(node: Eth2Node, topic: string, msg: auto): - Future[Result[void, cstring]] = + Future[SendResult] {.async: (raises: [CancelledError], raw: true).} = # Avoid {.async.} copies of message while broadcasting broadcast(node, topic, gossipEncode(msg)) @@ -2486,7 +2549,7 @@ proc getWallEpoch(node: Eth2Node): Epoch = proc broadcastAttestation*( node: Eth2Node, subnet_id: SubnetId, attestation: Attestation): - Future[SendResult] = + Future[SendResult] {.async: (raises: [CancelledError], raw: true).} = # Regardless of the contents of the attestation, # https://github.com/ethereum/consensus-specs/blob/v1.3.0/specs/altair/p2p-interface.md#transitioning-the-gossip # implies that pre-fork, messages using post-fork digests might be @@ -2499,63 +2562,72 @@ proc broadcastAttestation*( node.broadcast(topic, attestation) proc broadcastVoluntaryExit*( - node: Eth2Node, exit: SignedVoluntaryExit): Future[SendResult] = + node: Eth2Node, exit: SignedVoluntaryExit): + Future[SendResult] {.async: (raises: [CancelledError], raw: true).} = let topic = getVoluntaryExitsTopic(node.forkDigestAtEpoch(node.getWallEpoch)) node.broadcast(topic, exit) proc broadcastAttesterSlashing*( - node: Eth2Node, slashing: AttesterSlashing): Future[SendResult] = + node: Eth2Node, slashing: AttesterSlashing): + Future[SendResult] {.async: (raises: [CancelledError], raw: true).} = let topic = getAttesterSlashingsTopic( node.forkDigestAtEpoch(node.getWallEpoch)) node.broadcast(topic, slashing) proc broadcastProposerSlashing*( - node: Eth2Node, slashing: ProposerSlashing): Future[SendResult] = + node: Eth2Node, slashing: ProposerSlashing): + Future[SendResult] {.async: (raises: [CancelledError], raw: true).} = let topic = getProposerSlashingsTopic( node.forkDigestAtEpoch(node.getWallEpoch)) node.broadcast(topic, slashing) proc broadcastBlsToExecutionChange*( node: Eth2Node, bls_to_execution_change: SignedBLSToExecutionChange): - Future[SendResult] = + Future[SendResult] {.async: (raises: [CancelledError], raw: true).} = let topic = getBlsToExecutionChangeTopic( node.forkDigestAtEpoch(node.getWallEpoch)) node.broadcast(topic, bls_to_execution_change) proc broadcastAggregateAndProof*( - node: Eth2Node, proof: SignedAggregateAndProof): Future[SendResult] = + node: Eth2Node, proof: SignedAggregateAndProof): + Future[SendResult] {.async: (raises: [CancelledError], raw: true).} = let topic = getAggregateAndProofsTopic( node.forkDigestAtEpoch(node.getWallEpoch)) node.broadcast(topic, proof) proc broadcastBeaconBlock*( - node: Eth2Node, blck: phase0.SignedBeaconBlock): Future[SendResult] = + node: Eth2Node, blck: phase0.SignedBeaconBlock): + Future[SendResult] {.async: (raises: [CancelledError], raw: true).} = let topic = getBeaconBlocksTopic(node.forkDigests.phase0) node.broadcast(topic, blck) proc broadcastBeaconBlock*( - node: Eth2Node, blck: altair.SignedBeaconBlock): Future[SendResult] = + node: Eth2Node, blck: altair.SignedBeaconBlock): + Future[SendResult] {.async: (raises: [CancelledError], raw: true).} = let topic = getBeaconBlocksTopic(node.forkDigests.altair) node.broadcast(topic, blck) proc broadcastBeaconBlock*( - node: Eth2Node, blck: bellatrix.SignedBeaconBlock): Future[SendResult] = + node: Eth2Node, blck: bellatrix.SignedBeaconBlock): + Future[SendResult] {.async: (raises: [CancelledError], raw: true).} = let topic = getBeaconBlocksTopic(node.forkDigests.bellatrix) node.broadcast(topic, blck) proc broadcastBeaconBlock*( - node: Eth2Node, blck: capella.SignedBeaconBlock): Future[SendResult] = + node: Eth2Node, blck: capella.SignedBeaconBlock): + Future[SendResult] {.async: (raises: [CancelledError], raw: true).} = let topic = getBeaconBlocksTopic(node.forkDigests.capella) node.broadcast(topic, blck) proc broadcastBeaconBlock*( - node: Eth2Node, blck: deneb.SignedBeaconBlock): Future[SendResult] = + node: Eth2Node, blck: deneb.SignedBeaconBlock): + Future[SendResult] {.async: (raises: [CancelledError], raw: true).} = let topic = getBeaconBlocksTopic(node.forkDigests.deneb) node.broadcast(topic, blck) proc broadcastBlobSidecar*( node: Eth2Node, subnet_id: BlobId, blob: deneb.BlobSidecar): - Future[SendResult] = + Future[SendResult] {.async: (raises: [CancelledError], raw: true).} = let forkPrefix = node.forkDigestAtEpoch(node.getWallEpoch) topic = getBlobSidecarTopic(forkPrefix, subnet_id) @@ -2563,27 +2635,29 @@ proc broadcastBlobSidecar*( proc broadcastSyncCommitteeMessage*( node: Eth2Node, msg: SyncCommitteeMessage, - subcommitteeIdx: SyncSubcommitteeIndex): Future[SendResult] = + subcommitteeIdx: SyncSubcommitteeIndex): + Future[SendResult] {.async: (raises: [CancelledError], raw: true).} = let topic = getSyncCommitteeTopic( node.forkDigestAtEpoch(node.getWallEpoch), subcommitteeIdx) node.broadcast(topic, msg) proc broadcastSignedContributionAndProof*( - node: Eth2Node, msg: SignedContributionAndProof): Future[SendResult] = + node: Eth2Node, msg: SignedContributionAndProof): + Future[SendResult] {.async: (raises: [CancelledError], raw: true).} = let topic = getSyncCommitteeContributionAndProofTopic( node.forkDigestAtEpoch(node.getWallEpoch)) node.broadcast(topic, msg) proc broadcastLightClientFinalityUpdate*( node: Eth2Node, msg: ForkyLightClientFinalityUpdate): - Future[SendResult] = + Future[SendResult] {.async: (raises: [CancelledError], raw: true).} = let topic = getLightClientFinalityUpdateTopic( node.forkDigestAtEpoch(msg.contextEpoch)) node.broadcast(topic, msg) proc broadcastLightClientOptimisticUpdate*( node: Eth2Node, msg: ForkyLightClientOptimisticUpdate): - Future[SendResult] = + Future[SendResult] {.async: (raises: [CancelledError], raw: true).} = let topic = getLightClientOptimisticUpdateTopic( node.forkDigestAtEpoch(msg.contextEpoch)) node.broadcast(topic, msg) diff --git a/beacon_chain/networking/eth2_protocol_dsl.nim b/beacon_chain/networking/eth2_protocol_dsl.nim index 33e9526a2..7bb86bef7 100644 --- a/beacon_chain/networking/eth2_protocol_dsl.nim +++ b/beacon_chain/networking/eth2_protocol_dsl.nim @@ -557,7 +557,7 @@ proc createSendProc*(msg: Message, def[3][0] = if procType == nnkMacroDef: ident "untyped" elif msg.kind == msgRequest and not isRawSender: - Fut(msg.requestResultType) + ident "auto" elif msg.kind == msgHandshake and not isRawSender: Fut(msg.recName) else: @@ -652,12 +652,17 @@ proc useStandardBody*(sendProc: SendProc, sendProc.setBody quote do: mixin init, WriterType, beginRecord, endRecord, getOutput - var `outputStream` = memoryOutput() - `preSerialization` - `serialization` - `postSerialization` - `tracing` - let `msgBytes` = getOutput(`outputStream`) + let `msgBytes` = + try: + var `outputStream` = memoryOutput() + `preSerialization` + `serialization` + `postSerialization` + `tracing` + getOutput(`outputStream`) + except IOError: + raiseAssert "memoryOutput doesn't raise IOError actually" + `sendCall` proc correctSerializerProcParams(params: NimNode) = diff --git a/beacon_chain/networking/peer_pool.nim b/beacon_chain/networking/peer_pool.nim index 6bedebb89..cc21f05e1 100644 --- a/beacon_chain/networking/peer_pool.nim +++ b/beacon_chain/networking/peer_pool.nim @@ -108,12 +108,15 @@ template outgoingEvent(eventType: EventType): AsyncEvent = pool.outNotFullEvent proc waitForEvent[A, B](pool: PeerPool[A, B], eventType: EventType, - filter: set[PeerType]) {.async.} = + filter: set[PeerType]) {.async: (raises: [CancelledError]).} = if filter == {PeerType.Incoming, PeerType.Outgoing} or filter == {}: var fut1 = incomingEvent(eventType).wait() var fut2 = outgoingEvent(eventType).wait() try: - discard await one(fut1, fut2) + try: + discard await one(fut1, fut2) + except ValueError: + raiseAssert "one precondition satisfied" if fut1.finished(): if not(fut2.finished()): await fut2.cancelAndWait() @@ -138,11 +141,11 @@ proc waitForEvent[A, B](pool: PeerPool[A, B], eventType: EventType, outgoingEvent(eventType).clear() proc waitNotEmptyEvent[A, B](pool: PeerPool[A, B], - filter: set[PeerType]): Future[void] = + filter: set[PeerType]) {.async: (raises: [CancelledError], raw: true).} = pool.waitForEvent(EventType.NotEmptyEvent, filter) proc waitNotFullEvent[A, B](pool: PeerPool[A, B], - filter: set[PeerType]): Future[void] = + filter: set[PeerType]){.async: (raises: [CancelledError], raw: true).} = pool.waitForEvent(EventType.NotFullEvent, filter) proc newPeerPool*[A, B](maxPeers = -1, maxIncomingPeers = -1, @@ -451,7 +454,7 @@ proc getPeerSpaceMask[A, B](pool: PeerPool[A, B], {PeerType.Outgoing} proc waitForEmptySpace*[A, B](pool: PeerPool[A, B], - peerType: PeerType) {.async.} = + peerType: PeerType) {.async: (raises: [CancelledError]).} = ## This procedure will block until ``pool`` will have an empty space for peer ## of type ``peerType``. let mask = pool.getPeerSpaceMask(peerType) @@ -459,7 +462,7 @@ proc waitForEmptySpace*[A, B](pool: PeerPool[A, B], await pool.waitNotFullEvent(mask) proc addPeer*[A, B](pool: PeerPool[A, B], - peer: A, peerType: PeerType): Future[PeerStatus] {.async.} = + peer: A, peerType: PeerType): Future[PeerStatus] {.async: (raises: [CancelledError]).} = ## Add peer ``peer`` of type ``peerType`` to PeerPool ``pool``. ## ## This procedure will wait for an empty space in PeerPool ``pool``, if @@ -533,7 +536,7 @@ proc acquireItemImpl[A, B](pool: PeerPool[A, B], proc acquire*[A, B](pool: PeerPool[A, B], filter = {PeerType.Incoming, - PeerType.Outgoing}): Future[A] {.async.} = + PeerType.Outgoing}): Future[A] {.async: (raises: [CancelledError]).} = ## Acquire peer from PeerPool ``pool``, which match the filter ``filter``. mixin getKey doAssert(filter != {}, "Filter must not be empty") @@ -586,7 +589,7 @@ proc release*[A, B](pool: PeerPool[A, B], peers: openArray[A]) {.inline.} = proc acquire*[A, B](pool: PeerPool[A, B], number: int, filter = {PeerType.Incoming, - PeerType.Outgoing}): Future[seq[A]] {.async.} = + PeerType.Outgoing}): Future[seq[A]] {.async: (raises: [CancelledError]).} = ## Acquire ``number`` number of peers from PeerPool ``pool``, which match the ## filter ``filter``. doAssert(filter != {}, "Filter must not be empty") @@ -735,7 +738,7 @@ proc clear*[A, B](pool: PeerPool[A, B]) = pool.acqIncPeersCount = 0 pool.acqOutPeersCount = 0 -proc clearSafe*[A, B](pool: PeerPool[A, B]) {.async.} = +proc clearSafe*[A, B](pool: PeerPool[A, B]) {.async: (raises: [CancelledError]).} = ## Performs "safe" clear. Safe means that it first acquires all the peers ## in PeerPool, and only after that it will reset storage. var acquired = newSeq[A]() diff --git a/beacon_chain/networking/peer_protocol.nim b/beacon_chain/networking/peer_protocol.nim index 47725065c..514e8f555 100644 --- a/beacon_chain/networking/peer_protocol.nim +++ b/beacon_chain/networking/peer_protocol.nim @@ -110,7 +110,7 @@ proc checkStatusMsg(state: PeerSyncNetworkState, status: StatusMsg): proc handleStatus(peer: Peer, state: PeerSyncNetworkState, - theirStatus: StatusMsg): Future[bool] {.gcsafe.} + theirStatus: StatusMsg): Future[bool] {.async: (raises: [CancelledError]).} {.pop.} # TODO fix p2p macro for raises @@ -118,7 +118,7 @@ p2pProtocol PeerSync(version = 1, networkState = PeerSyncNetworkState, peerState = PeerSyncPeerState): - onPeerConnected do (peer: Peer, incoming: bool) {.async.}: + onPeerConnected do (peer: Peer, incoming: bool) {.async: (raises: [CancelledError]).}: debug "Peer connected", peer, peerId = shortLog(peer.peerId), incoming # Per the eth2 protocol, whoever dials must send a status message when @@ -155,7 +155,7 @@ p2pProtocol PeerSync(version = 1, proc ping(peer: Peer, value: uint64): uint64 {.libp2pProtocol("ping", 1).} = - return peer.network.metadata.seq_number + peer.network.metadata.seq_number # https://github.com/ethereum/consensus-specs/blob/v1.4.0-beta.5/specs/altair/p2p-interface.md#transitioning-from-v1-to-v2 proc getMetaData(peer: Peer): uint64 @@ -164,10 +164,9 @@ p2pProtocol PeerSync(version = 1, proc getMetadata_v2(peer: Peer): altair.MetaData {.libp2pProtocol("metadata", 2).} = - return peer.network.metadata + peer.network.metadata - proc goodbye(peer: Peer, - reason: uint64) + proc goodbye(peer: Peer, reason: uint64) {.async, libp2pProtocol("goodbye", 1).} = debug "Received Goodbye message", reason = disconnectReasonName(reason), peer @@ -178,7 +177,8 @@ proc setStatusMsg(peer: Peer, statusMsg: StatusMsg) = proc handleStatus(peer: Peer, state: PeerSyncNetworkState, - theirStatus: StatusMsg): Future[bool] {.async.} = + theirStatus: StatusMsg): Future[bool] + {.async: (raises: [CancelledError]).} = let res = checkStatusMsg(state, theirStatus) @@ -195,21 +195,16 @@ proc handleStatus(peer: Peer, await peer.handlePeer() true -proc updateStatus*(peer: Peer): Future[bool] {.async.} = +proc updateStatus*(peer: Peer): Future[bool] {.async: (raises: [CancelledError]).} = ## Request `status` of remote peer ``peer``. let nstate = peer.networkState(PeerSync) ourStatus = getCurrentStatus(nstate) + theirStatus = + (await peer.status(ourStatus, timeout = RESP_TIMEOUT_DUR)).valueOr: + return false - let theirFut = awaitne peer.status(ourStatus, timeout = RESP_TIMEOUT_DUR) - if theirFut.failed(): - return false - else: - let theirStatus = theirFut.read() - if theirStatus.isOk: - return await peer.handleStatus(nstate, theirStatus.get()) - else: - return false + await peer.handleStatus(nstate, theirStatus) proc getHeadSlot*(peer: Peer): Slot = ## Returns head slot for specific peer ``peer``. diff --git a/beacon_chain/nimbus_beacon_node.nim b/beacon_chain/nimbus_beacon_node.nim index 9e8b8ce12..2905320f9 100644 --- a/beacon_chain/nimbus_beacon_node.nim +++ b/beacon_chain/nimbus_beacon_node.nim @@ -351,7 +351,7 @@ proc initFullNode( blobQuarantine, getBeaconTime) blockVerifier = proc(signedBlock: ForkedSignedBeaconBlock, blobs: Opt[BlobSidecars], maybeFinalized: bool): - Future[Result[void, VerifierError]] = + Future[Result[void, VerifierError]] {.async: (raises: [CancelledError], raw: true).} = # The design with a callback for block verification is unusual compared # to the rest of the application, but fits with the general approach # taken in the sync/request managers - this is an architectural compromise @@ -360,27 +360,23 @@ proc initFullNode( MsgSource.gossip, signedBlock, blobs, maybeFinalized = maybeFinalized) rmanBlockVerifier = proc(signedBlock: ForkedSignedBeaconBlock, maybeFinalized: bool): - Future[Result[void, VerifierError]] = + Future[Result[void, VerifierError]] {.async: (raises: [CancelledError]).} = withBlck(signedBlock): - when typeof(forkyBlck).kind >= ConsensusFork.Deneb: + when consensusFork >= ConsensusFork.Deneb: if not blobQuarantine[].hasBlobs(forkyBlck): # We don't have all the blobs for this block, so we have # to put it in blobless quarantine. if not quarantine[].addBlobless(dag.finalizedHead.slot, forkyBlck): - Future.completed( - Result[void, VerifierError].err(VerifierError.UnviableFork), - "rmanBlockVerifier") + err(VerifierError.UnviableFork) else: - Future.completed( - Result[void, VerifierError].err(VerifierError.MissingParent), - "rmanBlockVerifier") + err(VerifierError.MissingParent) else: let blobs = blobQuarantine[].popBlobs(forkyBlck.root, forkyBlck) - blockProcessor[].addBlock(MsgSource.gossip, signedBlock, + await blockProcessor[].addBlock(MsgSource.gossip, signedBlock, Opt.some(blobs), maybeFinalized = maybeFinalized) else: - blockProcessor[].addBlock(MsgSource.gossip, signedBlock, + await blockProcessor[].addBlock(MsgSource.gossip, signedBlock, Opt.none(BlobSidecars), maybeFinalized = maybeFinalized) diff --git a/beacon_chain/sync/light_client_protocol.nim b/beacon_chain/sync/light_client_protocol.nim index d4aa4a47f..ba2ee81b7 100644 --- a/beacon_chain/sync/light_client_protocol.nim +++ b/beacon_chain/sync/light_client_protocol.nim @@ -31,7 +31,7 @@ type proc readChunkPayload*( conn: Connection, peer: Peer, MsgType: type SomeForkedLightClientObject): - Future[NetRes[MsgType]] {.async.} = + Future[NetRes[MsgType]] {.async: (raises: [CancelledError]).} = var contextBytes: ForkDigest try: await conn.readExactly(addr contextBytes, sizeof contextBytes) diff --git a/beacon_chain/sync/request_manager.nim b/beacon_chain/sync/request_manager.nim index 008bc5bd0..1e57e4798 100644 --- a/beacon_chain/sync/request_manager.nim +++ b/beacon_chain/sync/request_manager.nim @@ -39,7 +39,7 @@ const type BlockVerifierFn* = proc(signedBlock: ForkedSignedBeaconBlock, maybeFinalized: bool): - Future[Result[void, VerifierError]] {.gcsafe, raises: [].} + Future[Result[void, VerifierError]] {.async: (raises: [CancelledError]).} InhibitFn* = proc: bool {.gcsafe, raises:[].} RequestManager* = object @@ -49,8 +49,8 @@ type quarantine: ref Quarantine blobQuarantine: ref BlobQuarantine blockVerifier: BlockVerifierFn - blockLoopFuture: Future[void] - blobLoopFuture: Future[void] + blockLoopFuture: Future[void].Raising([CancelledError]) + blobLoopFuture: Future[void].Raising([CancelledError]) func shortLog*(x: seq[Eth2Digest]): string = "[" & x.mapIt(shortLog(it)).join(", ") & "]" @@ -104,7 +104,7 @@ proc checkResponse(idList: seq[BlobIdentifier], return false true -proc requestBlocksByRoot(rman: RequestManager, items: seq[Eth2Digest]) {.async.} = +proc requestBlocksByRoot(rman: RequestManager, items: seq[Eth2Digest]) {.async: (raises: [CancelledError]).} = var peer: Peer try: peer = await rman.network.peerPool.acquire() @@ -171,19 +171,13 @@ proc requestBlocksByRoot(rman: RequestManager, items: seq[Eth2Digest]) {.async.} peer = peer, blocks = shortLog(items), err = blocks.error() peer.updateScore(PeerScoreNoValues) - except CancelledError as exc: - raise exc - except CatchableError as exc: - peer.updateScore(PeerScoreNoValues) - debug "Error while fetching blocks by root", exc = exc.msg, - items = shortLog(items), peer = peer, peer_score = peer.getScore() - raise exc finally: if not(isNil(peer)): rman.network.peerPool.release(peer) proc fetchBlobsFromNetwork(self: RequestManager, - idList: seq[BlobIdentifier]) {.async.} = + idList: seq[BlobIdentifier]) + {.async: (raises: [CancelledError]).} = var peer: Peer try: @@ -191,7 +185,7 @@ proc fetchBlobsFromNetwork(self: RequestManager, debug "Requesting blobs by root", peer = peer, blobs = shortLog(idList), peer_score = peer.getScore() - let blobs = (await blobSidecarsByRoot(peer, BlobIdentifierList idList)) + let blobs = await blobSidecarsByRoot(peer, BlobIdentifierList idList) if blobs.isOk: let ublobs = blobs.get() @@ -219,18 +213,11 @@ proc fetchBlobsFromNetwork(self: RequestManager, peer = peer, blobs = shortLog(idList), err = blobs.error() peer.updateScore(PeerScoreNoValues) - except CancelledError as exc: - raise exc - except CatchableError as exc: - peer.updateScore(PeerScoreNoValues) - debug "Error while fetching blobs by root", exc = exc.msg, - idList = shortLog(idList), peer = peer, peer_score = peer.getScore() - raise exc finally: if not(isNil(peer)): self.network.peerPool.release(peer) -proc requestManagerBlockLoop(rman: RequestManager) {.async.} = +proc requestManagerBlockLoop(rman: RequestManager) {.async: (raises: [CancelledError]).} = while true: # TODO This polling could be replaced with an AsyncEvent that is fired # from the quarantine when there's work to do @@ -245,33 +232,19 @@ proc requestManagerBlockLoop(rman: RequestManager) {.async.} = continue debug "Requesting detected missing blocks", blocks = shortLog(blocks) - try: - let start = SyncMoment.now(0) + let start = SyncMoment.now(0) - var workers: array[PARALLEL_REQUESTS, Future[void]] + var workers: array[PARALLEL_REQUESTS, Future[void].Raising([CancelledError])] - for i in 0 ..< PARALLEL_REQUESTS: - workers[i] = rman.requestBlocksByRoot(blocks) + for i in 0 ..< PARALLEL_REQUESTS: + workers[i] = rman.requestBlocksByRoot(blocks) - await allFutures(workers) + await allFutures(workers) - let finish = SyncMoment.now(uint64(len(blocks))) - - var succeed = 0 - for worker in workers: - if worker.completed(): - inc(succeed) - - debug "Request manager block tick", blocks = shortLog(blocks), - succeed = succeed, - failed = (len(workers) - succeed), - sync_speed = speed(start, finish) - - except CancelledError: - break - except CatchableError as exc: - warn "Unexpected error in request manager block loop", exc = exc.msg + let finish = SyncMoment.now(uint64(len(blocks))) + debug "Request manager block tick", blocks = shortLog(blocks), + sync_speed = speed(start, finish) proc getMissingBlobs(rman: RequestManager): seq[BlobIdentifier] = let @@ -308,42 +281,28 @@ proc getMissingBlobs(rman: RequestManager): seq[BlobIdentifier] = rman.quarantine[].removeBlobless(blobless) fetches - -proc requestManagerBlobLoop(rman: RequestManager) {.async.} = +proc requestManagerBlobLoop(rman: RequestManager) {.async: (raises: [CancelledError]).} = while true: - # TODO This polling could be replaced with an AsyncEvent that is fired - # from the quarantine when there's work to do + # TODO This polling could be replaced with an AsyncEvent that is fired + # from the quarantine when there's work to do await sleepAsync(POLL_INTERVAL) if rman.inhibit(): continue - let fetches = rman.getMissingBlobs() - if fetches.len > 0: - debug "Requesting detected missing blobs", blobs = shortLog(fetches) - try: - let start = SyncMoment.now(0) - var workers: array[PARALLEL_REQUESTS, Future[void]] - for i in 0 ..< PARALLEL_REQUESTS: - workers[i] = rman.fetchBlobsFromNetwork(fetches) + let fetches = rman.getMissingBlobs() + if fetches.len > 0: + debug "Requesting detected missing blobs", blobs = shortLog(fetches) + let start = SyncMoment.now(0) + var workers: array[PARALLEL_REQUESTS, Future[void].Raising([CancelledError])] + for i in 0 ..< PARALLEL_REQUESTS: + workers[i] = rman.fetchBlobsFromNetwork(fetches) - await allFutures(workers) - let finish = SyncMoment.now(uint64(len(fetches))) + await allFutures(workers) + let finish = SyncMoment.now(uint64(len(fetches))) - var succeed = 0 - for worker in workers: - if worker.finished() and not(worker.failed()): - inc(succeed) - - debug "Request manager blob tick", - blobs_count = len(fetches), - succeed = succeed, - failed = (len(workers) - succeed), - sync_speed = speed(start, finish) - - except CancelledError: - break - except CatchableError as exc: - warn "Unexpected error in request manager blob loop", exc = exc.msg + debug "Request manager blob tick", + blobs_count = len(fetches), + sync_speed = speed(start, finish) proc start*(rman: var RequestManager) = ## Start Request Manager's loops. diff --git a/beacon_chain/sync/sync_manager.nim b/beacon_chain/sync/sync_manager.nim index 4e231eb13..6e66b99da 100644 --- a/beacon_chain/sync/sync_manager.nim +++ b/beacon_chain/sync/sync_manager.nim @@ -43,7 +43,7 @@ type NoMonitor SyncWorker*[A, B] = object - future: Future[void] + future: Future[void].Raising([CancelledError]) status: SyncWorkerStatus SyncManager*[A, B] = ref object @@ -158,8 +158,9 @@ proc newSyncManager*[A, B](pool: PeerPool[A, B], res.initQueue() res -proc getBlocks*[A, B](man: SyncManager[A, B], peer: A, - req: SyncRequest): Future[BeaconBlocksRes] {.async.} = +proc getBlocks[A, B](man: SyncManager[A, B], peer: A, + req: SyncRequest): Future[BeaconBlocksRes] {. + async: (raises: [CancelledError], raw: true).} = mixin getScore, `==` logScope: @@ -171,21 +172,8 @@ proc getBlocks*[A, B](man: SyncManager[A, B], peer: A, doAssert(not(req.isEmpty()), "Request must not be empty!") debug "Requesting blocks from peer", request = req - try: - let res = await beaconBlocksByRange_v2(peer, req.slot, req.count, 1'u64) - if res.isErr(): - debug "Error, while reading getBlocks response", request = req, - error = $res.error() - return - return res - except CancelledError: - debug "Interrupt, while waiting getBlocks response", request = req - return - except CatchableError as exc: - debug "Error, while waiting getBlocks response", request = req, - errName = exc.name, errMsg = exc.msg - return + beaconBlocksByRange_v2(peer, req.slot, req.count, 1'u64) proc shouldGetBlobs[A, B](man: SyncManager[A, B], e: Epoch): bool = let wallEpoch = man.getLocalWallSlot().epoch @@ -194,8 +182,8 @@ proc shouldGetBlobs[A, B](man: SyncManager[A, B], e: Epoch): bool = e >= wallEpoch - man.MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS) proc getBlobSidecars[A, B](man: SyncManager[A, B], peer: A, - req: SyncRequest - ): Future[BlobSidecarsRes] {.async.} = + req: SyncRequest): Future[BlobSidecarsRes] + {.async: (raises: [CancelledError], raw: true).} = mixin getScore, `==` logScope: @@ -207,21 +195,7 @@ proc getBlobSidecars[A, B](man: SyncManager[A, B], peer: A, doAssert(not(req.isEmpty()), "Request must not be empty!") debug "Requesting blobs sidecars from peer", request = req - try: - let res = await blobSidecarsByRange(peer, req.slot, req.count) - - if res.isErr(): - debug "Error, while reading blobSidecarsByRange response", request = req, - error = $res.error() - return - return res - except CancelledError: - debug "Interrupt, while waiting blobSidecarsByRange response", request = req - return - except CatchableError as exc: - debug "Error, while waiting blobSidecarsByRange response", request = req, - errName = exc.name, errMsg = exc.msg - return + blobSidecarsByRange(peer, req.slot, req.count) proc remainingSlots(man: SyncManager): uint64 = let @@ -282,7 +256,8 @@ func checkBlobs(blobs: seq[BlobSidecars]): Result[void, string] = ? blob_sidecar[].verify_blob_sidecar_inclusion_proof() ok() -proc syncStep[A, B](man: SyncManager[A, B], index: int, peer: A) {.async.} = +proc syncStep[A, B](man: SyncManager[A, B], index: int, peer: A) + {.async: (raises: [CancelledError]).} = logScope: peer_score = peer.getScore() peer_speed = peer.netKbps() @@ -322,17 +297,11 @@ proc syncStep[A, B](man: SyncManager[A, B], index: int, peer: A) {.async.} = trace "Updating peer's status information", wall_clock_slot = wallSlot, remote_head_slot = peerSlot, local_head_slot = headSlot - try: - let res = await peer.updateStatus() - if not(res): - peer.updateScore(PeerScoreNoStatus) - debug "Failed to get remote peer's status, exiting", - peer_head_slot = peerSlot + if not await peer.updateStatus(): + peer.updateScore(PeerScoreNoStatus) + debug "Failed to get remote peer's status, exiting", + peer_head_slot = peerSlot - return - except CatchableError as exc: - debug "Unexpected exception while updating peer's status", - peer_head_slot = peerSlot, errName = exc.name, errMsg = exc.msg return let newPeerSlot = peer.getHeadSlot() @@ -419,110 +388,97 @@ proc syncStep[A, B](man: SyncManager[A, B], index: int, peer: A) {.async.} = man.workers[index].status = SyncWorkerStatus.Downloading - try: - let blocks = await man.getBlocks(peer, req) - if blocks.isErr(): - peer.updateScore(PeerScoreNoValues) - man.queue.push(req) - debug "Failed to receive blocks on request", request = req - return - let blockData = blocks.get().asSeq() - let blockSmap = getShortMap(req, blockData) - debug "Received blocks on request", blocks_count = len(blockData), - blocks_map = blockSmap, request = req - - let slots = mapIt(blockData, it[].slot) - if not(checkResponse(req, slots)): - peer.updateScore(PeerScoreBadResponse) - man.queue.push(req) - warn "Received blocks sequence is not in requested range", - blocks_count = len(blockData), blocks_map = blockSmap, - request = req - return - - func combine(acc: seq[Slot], cur: Slot): seq[Slot] = - var copy = acc - if copy[copy.len-1] != cur: - copy.add(cur) - copy - - let blobData = - if man.shouldGetBlobs(req.slot.epoch): - let blobs = await man.getBlobSidecars(peer, req) - if blobs.isErr(): - peer.updateScore(PeerScoreNoValues) - man.queue.push(req) - debug "Failed to receive blobs on request", request = req - return - let blobData = blobs.get().asSeq() - let blobSmap = getShortMap(req, blobData) - debug "Received blobs on request", blobs_count = len(blobData), - blobs_map = blobSmap, request = req - - if len(blobData) > 0: - let slots = mapIt(blobData, it[].signed_block_header.message.slot) - let uniqueSlots = foldl(slots, combine(a, b), @[slots[0]]) - if not(checkResponse(req, uniqueSlots)): - peer.updateScore(PeerScoreBadResponse) - man.queue.push(req) - warn "Received blobs sequence is not in requested range", - blobs_count = len(blobData), blobs_map = getShortMap(req, blobData), - request = req - return - let groupedBlobs = groupBlobs(req, blockData, blobData) - if groupedBlobs.isErr(): - peer.updateScore(PeerScoreNoValues) - man.queue.push(req) - info "Received blobs sequence is inconsistent", - blobs_map = getShortMap(req, blobData), request = req, msg=groupedBlobs.error() - return - if (let checkRes = groupedBlobs.get.checkBlobs(); checkRes.isErr): - peer.updateScore(PeerScoreBadResponse) - man.queue.push(req) - warn "Received blobs sequence is invalid", - blobs_count = len(blobData), - blobs_map = getShortMap(req, blobData), - request = req, - msg = checkRes.error - return - Opt.some(groupedBlobs.get()) - else: - Opt.none(seq[BlobSidecars]) - - if len(blockData) == 0 and man.direction == SyncQueueKind.Backward and - req.contains(man.getSafeSlot()): - # The sync protocol does not distinguish between: - # - All requested slots are empty - # - Peer does not have data available about requested range - # - # However, we include the `backfill` slot in backward sync requests. - # If we receive an empty response to a request covering that slot, - # we know that the response is incomplete and can descore. - peer.updateScore(PeerScoreNoValues) - man.queue.push(req) - debug "Response does not include known-to-exist block", request = req - return - - # Scoring will happen in `syncUpdate`. - man.workers[index].status = SyncWorkerStatus.Queueing - let - peerFinalized = peer.getFinalizedEpoch().start_slot() - lastSlot = req.slot + req.count - # The peer claims the block is finalized - our own block processing will - # verify this point down the line - # TODO descore peers that lie - maybeFinalized = lastSlot < peerFinalized - - await man.queue.push(req, blockData, blobData, maybeFinalized, proc() = - man.workers[index].status = SyncWorkerStatus.Processing) - - except CatchableError as exc: + let blocks = (await man.getBlocks(peer, req)).valueOr: + peer.updateScore(PeerScoreNoValues) man.queue.push(req) - debug "Unexpected exception while receiving blocks", request = req, - errName = exc.name, errMsg = exc.msg + debug "Failed to receive blocks on request", request = req return -proc syncWorker[A, B](man: SyncManager[A, B], index: int) {.async.} = + let blockSmap = getShortMap(req, blocks.asSeq()) + debug "Received blocks on request", blocks_count = len(blocks), + blocks_map = blockSmap, request = req + + let slots = mapIt(blocks, it[].slot) + if not(checkResponse(req, slots)): + peer.updateScore(PeerScoreBadResponse) + man.queue.push(req) + warn "Received blocks sequence is not in requested range", + blocks_count = len(blocks), blocks_map = blockSmap, + request = req + return + + func combine(acc: seq[Slot], cur: Slot): seq[Slot] = + var copy = acc + if copy[copy.len-1] != cur: + copy.add(cur) + copy + + let blobData = + if man.shouldGetBlobs(req.slot.epoch): + let blobs = (await man.getBlobSidecars(peer, req)).valueOr: + peer.updateScore(PeerScoreNoValues) + man.queue.push(req) + debug "Failed to receive blobs on request", request = req + return + let blobSmap = getShortMap(req, blobs.asSeq()) + debug "Received blobs on request", blobs_count = len(blobs), + blobs_map = blobSmap, request = req + + if len(blobs) > 0: + let slots = mapIt(blobs, it[].signed_block_header.message.slot) + let uniqueSlots = foldl(slots, combine(a, b), @[slots[0]]) + if not(checkResponse(req, uniqueSlots)): + peer.updateScore(PeerScoreBadResponse) + man.queue.push(req) + warn "Received blobs sequence is not in requested range", + blobs_count = len(blobs), blobs_map = blobSmap, + request = req + return + let groupedBlobs = groupBlobs(req, blocks.asSeq(), blobs.asSeq()) + if groupedBlobs.isErr(): + peer.updateScore(PeerScoreNoValues) + man.queue.push(req) + info "Received blobs sequence is inconsistent", + blobs_map = blobSmap, request = req, msg=groupedBlobs.error() + return + if (let checkRes = groupedBlobs.get.checkBlobs(); checkRes.isErr): + peer.updateScore(PeerScoreBadResponse) + man.queue.push(req) + warn "Received blobs sequence is invalid", + blobs_map = blobSmap, request = req, msg=groupedBlobs.error() + return + Opt.some(groupedBlobs.get()) + else: + Opt.none(seq[BlobSidecars]) + + if len(blocks) == 0 and man.direction == SyncQueueKind.Backward and + req.contains(man.getSafeSlot()): + # The sync protocol does not distinguish between: + # - All requested slots are empty + # - Peer does not have data available about requested range + # + # However, we include the `backfill` slot in backward sync requests. + # If we receive an empty response to a request covering that slot, + # we know that the response is incomplete and can descore. + peer.updateScore(PeerScoreNoValues) + man.queue.push(req) + debug "Response does not include known-to-exist block", request = req + return + + # Scoring will happen in `syncUpdate`. + man.workers[index].status = SyncWorkerStatus.Queueing + let + peerFinalized = peer.getFinalizedEpoch().start_slot() + lastSlot = req.slot + req.count + # The peer claims the block is finalized - our own block processing will + # verify this point down the line + # TODO descore peers that lie + maybeFinalized = lastSlot < peerFinalized + + await man.queue.push(req, blocks.asSeq(), blobData, maybeFinalized, proc() = + man.workers[index].status = SyncWorkerStatus.Processing) + +proc syncWorker[A, B](man: SyncManager[A, B], index: int) {.async: (raises: [CancelledError]).} = mixin getKey, getScore, getHeadSlot logScope: @@ -533,30 +489,21 @@ proc syncWorker[A, B](man: SyncManager[A, B], index: int) {.async.} = debug "Starting syncing worker" - while true: - var peer: A = nil - let doBreak = - try: - man.workers[index].status = SyncWorkerStatus.Sleeping - # This event is going to be set until we are not in sync with network - await man.notInSyncEvent.wait() - man.workers[index].status = SyncWorkerStatus.WaitingPeer - peer = await man.pool.acquire() - await man.syncStep(index, peer) - man.pool.release(peer) - false - except CancelledError: - if not(isNil(peer)): - man.pool.release(peer) - true - except CatchableError as exc: - debug "Unexpected exception in sync worker", - peer = peer, peer_score = peer.getScore(), - peer_speed = peer.netKbps(), - errName = exc.name, errMsg = exc.msg - true - if doBreak: - break + var peer: A = nil + + try: + while true: + man.workers[index].status = SyncWorkerStatus.Sleeping + # This event is going to be set until we are not in sync with network + await man.notInSyncEvent.wait() + man.workers[index].status = SyncWorkerStatus.WaitingPeer + peer = await man.pool.acquire() + await man.syncStep(index, peer) + man.pool.release(peer) + peer = nil + finally: + if not(isNil(peer)): + man.pool.release(peer) debug "Sync worker stopped" @@ -593,34 +540,10 @@ proc getWorkersStats[A, B](man: SyncManager[A, B]): tuple[map: string, map[i] = ch (map, sleeping, waiting, pending) -proc guardTask[A, B](man: SyncManager[A, B]) {.async.} = - logScope: - index = index - sync_ident = man.ident - direction = man.direction - topics = "syncman" - - var pending: array[SyncWorkersCount, Future[void]] - +proc startWorkers[A, B](man: SyncManager[A, B]) = # Starting all the synchronization workers. for i in 0 ..< len(man.workers): - let future = syncWorker[A, B](man, i) - man.workers[i].future = future - pending[i] = future - - # Wait for synchronization worker's failure and replace it with new one. - while true: - let failFuture = await one(pending) - let index = pending.find(failFuture) - if failFuture.failed(): - warn "Synchronization worker stopped working unexpectedly with an error", - errName = failFuture.error.name, errMsg = failFuture.error.msg - else: - warn "Synchronization worker stopped working unexpectedly without error" - - let future = syncWorker[A, B](man, index) - man.workers[index].future = future - pending[index] = future + man.workers[i].future = syncWorker[A, B](man, i) proc toTimeLeftString*(d: Duration): string = if d == InfiniteDuration: @@ -648,11 +571,9 @@ proc toTimeLeftString*(d: Duration): string = res = res & "00m" res -proc syncClose[A, B](man: SyncManager[A, B], guardTaskFut: Future[void], +proc syncClose[A, B](man: SyncManager[A, B], speedTaskFut: Future[void]) {.async.} = var pending: seq[FutureBase] - if not(guardTaskFut.finished()): - pending.add(guardTaskFut.cancelAndWait()) if not(speedTaskFut.finished()): pending.add(speedTaskFut.cancelAndWait()) for worker in man.workers: @@ -669,11 +590,11 @@ proc syncLoop[A, B](man: SyncManager[A, B]) {.async.} = mixin getKey, getScore var pauseTime = 0 - var guardTaskFut = man.guardTask() + man.startWorkers() debug "Synchronization loop started" - proc averageSpeedTask() {.async.} = + proc averageSpeedTask() {.async: (raises: [CancelledError]).} = while true: # Reset sync speeds between each loss-of-sync event man.avgSyncSpeed = 0 @@ -703,7 +624,7 @@ proc syncLoop[A, B](man: SyncManager[A, B]) {.async.} = stamp = newStamp - var averageSpeedTaskFut = averageSpeedTask() + let averageSpeedTaskFut = averageSpeedTask() while true: let wallSlot = man.getLocalWallSlot() @@ -788,7 +709,7 @@ proc syncLoop[A, B](man: SyncManager[A, B]) {.async.} = of SyncQueueKind.Forward: if man.inProgress: if SyncManagerFlag.NoMonitor in man.flags: - await man.syncClose(guardTaskFut, averageSpeedTaskFut) + await man.syncClose(averageSpeedTaskFut) man.inProgress = false debug "Forward synchronization process finished, exiting", wall_head_slot = wallSlot, local_head_slot = headSlot, @@ -809,10 +730,8 @@ proc syncLoop[A, B](man: SyncManager[A, B]) {.async.} = of SyncQueueKind.Backward: # Backward syncing is going to be executed only once, so we exit loop # and stop all pending tasks which belongs to this instance (sync - # workers, guard task and speed calculation task). - # We first need to cancel and wait for guard task, because otherwise - # it will be able to restore cancelled workers. - await man.syncClose(guardTaskFut, averageSpeedTaskFut) + # workers, speed calculation task). + await man.syncClose(averageSpeedTaskFut) man.inProgress = false debug "Backward synchronization process finished, exiting", wall_head_slot = wallSlot, local_head_slot = headSlot, diff --git a/beacon_chain/sync/sync_protocol.nim b/beacon_chain/sync/sync_protocol.nim index c1b13f30f..8fdfde656 100644 --- a/beacon_chain/sync/sync_protocol.nim +++ b/beacon_chain/sync/sync_protocol.nim @@ -35,17 +35,14 @@ type BlockRootsList* = List[Eth2Digest, Limit MAX_REQUEST_BLOCKS] BlobIdentifierList* = List[BlobIdentifier, Limit (MAX_REQUEST_BLOB_SIDECARS)] -template readChunkPayload*( - conn: Connection, peer: Peer, MsgType: type ForkySignedBeaconBlock): - Future[NetRes[MsgType]] = - readChunkPayload(conn, peer, MsgType) - proc readChunkPayload*( conn: Connection, peer: Peer, MsgType: type (ref ForkedSignedBeaconBlock)): - Future[NetRes[MsgType]] {.async.} = + Future[NetRes[MsgType]] {.async: (raises: [CancelledError]).} = var contextBytes: ForkDigest try: await conn.readExactly(addr contextBytes, sizeof contextBytes) + except CancelledError as exc: + raise exc except CatchableError: return neterr UnexpectedEOF @@ -84,10 +81,12 @@ proc readChunkPayload*( proc readChunkPayload*( conn: Connection, peer: Peer, MsgType: type (ref BlobSidecar)): - Future[NetRes[MsgType]] {.async.} = + Future[NetRes[MsgType]] {.async: (raises: [CancelledError]).} = var contextBytes: ForkDigest try: await conn.readExactly(addr contextBytes, sizeof contextBytes) + except CancelledError as exc: + raise exc except CatchableError: return neterr UnexpectedEOF diff --git a/beacon_chain/sync/sync_queue.nim b/beacon_chain/sync/sync_queue.nim index 22d5100b8..66d7bbc1b 100644 --- a/beacon_chain/sync/sync_queue.nim +++ b/beacon_chain/sync/sync_queue.nim @@ -27,7 +27,7 @@ type ProcessingCallback* = proc() {.gcsafe, raises: [].} BlockVerifier* = proc(signedBlock: ForkedSignedBeaconBlock, blobs: Opt[BlobSidecars], maybeFinalized: bool): - Future[Result[void, VerifierError]] {.gcsafe, raises: [].} + Future[Result[void, VerifierError]] {.async: (raises: [CancelledError]).} SyncQueueKind* {.pure.} = enum Forward, Backward @@ -50,7 +50,7 @@ type item*: T SyncWaiter* = ref object - future: Future[void] + future: Future[void].Raising([CancelledError]) reset: bool RewindPoint = object @@ -311,9 +311,9 @@ proc wakeupWaiters[T](sq: SyncQueue[T], reset = false) = if not(item.future.finished()): item.future.complete() -proc waitForChanges[T](sq: SyncQueue[T]): Future[bool] {.async.} = +proc waitForChanges[T](sq: SyncQueue[T]): Future[bool] {.async: (raises: [CancelledError]).} = ## Create new waiter and wait for completion from `wakeupWaiters()`. - var waitfut = newFuture[void]("SyncQueue.waitForChanges") + let waitfut = Future[void].Raising([CancelledError]).init("SyncQueue.waitForChanges") let waititem = SyncWaiter(future: waitfut) sq.waiters.add(waititem) try: @@ -322,7 +322,7 @@ proc waitForChanges[T](sq: SyncQueue[T]): Future[bool] {.async.} = finally: sq.waiters.delete(sq.waiters.find(waititem)) -proc wakeupAndWaitWaiters[T](sq: SyncQueue[T]) {.async.} = +proc wakeupAndWaitWaiters[T](sq: SyncQueue[T]) {.async: (raises: [CancelledError]).} = ## This procedure will perform wakeupWaiters(true) and blocks until last ## waiter will be awakened. var waitChanges = sq.waitForChanges() @@ -333,7 +333,7 @@ proc clearAndWakeup*[T](sq: SyncQueue[T]) = sq.pending.clear() sq.wakeupWaiters(true) -proc resetWait*[T](sq: SyncQueue[T], toSlot: Option[Slot]) {.async.} = +proc resetWait*[T](sq: SyncQueue[T], toSlot: Option[Slot]) {.async: (raises: [CancelledError]).} = ## Perform reset of all the blocked waiters in SyncQueue. ## ## We adding one more waiter to the waiters sequence and @@ -610,7 +610,7 @@ proc push*[T](sq: SyncQueue[T], sr: SyncRequest[T], data: seq[ref ForkedSignedBeaconBlock], blobs: Opt[seq[BlobSidecars]], maybeFinalized: bool = false, - processingCb: ProcessingCallback = nil) {.async.} = + processingCb: ProcessingCallback = nil) {.async: (raises: [CancelledError]).} = logScope: sync_ident = sq.ident topics = "syncman" diff --git a/docs/the_nimbus_book/src/developers.md b/docs/the_nimbus_book/src/developers.md index 0c9cace1d..16dc53236 100644 --- a/docs/the_nimbus_book/src/developers.md +++ b/docs/the_nimbus_book/src/developers.md @@ -254,5 +254,5 @@ build/block_sim --slots=384 --validators=20000 --attesterRatio=0.66 ## Sync from a specific peer ```sh -build/nimbus_beacon_node --discv5:off --tcp-port=9876 --direct-peer="/ip4/127.0.0.1/tcp/9000/p2p/$(curl -s -X 'GET' 'http://localhost:5052/eth/v1/node/identity' -H 'accept: application/json' | jq -r .data.peer_id)" +build/nimbus_beacon_node --no-el --discv5:off --tcp-port=9876 --direct-peer="/ip4/127.0.0.1/tcp/9000/p2p/$(curl -s -X 'GET' 'http://localhost:5052/eth/v1/node/identity' -H 'accept: application/json' | jq -r .data.peer_id)" ``` diff --git a/tests/test_sync_manager.nim b/tests/test_sync_manager.nim index bdbde9f2b..d68db308d 100644 --- a/tests/test_sync_manager.nim +++ b/tests/test_sync_manager.nim @@ -50,8 +50,8 @@ proc collector(queue: AsyncQueue[BlockEntry]): BlockVerifier = # the BlockProcessor and this test proc verify(signedBlock: ForkedSignedBeaconBlock, blobs: Opt[BlobSidecars], maybeFinalized: bool): - Future[Result[void, VerifierError]] = - let fut = newFuture[Result[void, VerifierError]]() + Future[Result[void, VerifierError]] {.async: (raises: [CancelledError], raw: true).} = + let fut = Future[Result[void, VerifierError]].Raising([CancelledError]).init() try: queue.addLastNoWait(BlockEntry(blck: signedBlock, resfut: fut)) except CatchableError as exc: raiseAssert exc.msg return fut