diff --git a/beacon_chain/eth2_network.nim b/beacon_chain/eth2_network.nim index f8840c4cd..51f405f41 100644 --- a/beacon_chain/eth2_network.nim +++ b/beacon_chain/eth2_network.nim @@ -392,15 +392,15 @@ proc sendNotificationMsg(peer: Peer, protocolId: string, requestBytes: Bytes) {. raise newException(TransmissionError, "Failed to open LibP2P stream") let stream = streamFut.read - defer: + try: + var s = memoryOutput() + s.appendVarint requestBytes.len.uint64 + s.append requestBytes + let bytes = s.getOutput + await stream.write(bytes) + finally: await safeClose(stream) - var s = memoryOutput() - s.appendVarint requestBytes.len.uint64 - s.append requestBytes - let bytes = s.getOutput - await stream.write(bytes) - # TODO There is too much duplication in the responder functions, but # I hope to reduce this when I increse the reliance on output streams. proc sendResponseChunkBytes(responder: UntypedResponder, payload: Bytes) {.async.} = @@ -442,19 +442,19 @@ proc makeEth2Request(peer: Peer, protocolId: string, requestBytes: Bytes, return none(ResponseMsg) let stream = streamFut.read - defer: + try: + # Send the request + var s = memoryOutput() + s.appendVarint requestBytes.len.uint64 + s.append requestBytes + let bytes = s.getOutput + await stream.write(bytes) + + # Read the response + return await stream.readResponse(ResponseMsg, deadline) + finally: await safeClose(stream) - # Send the request - var s = memoryOutput() - s.appendVarint requestBytes.len.uint64 - s.append requestBytes - let bytes = s.getOutput - await stream.write(bytes) - - # Read the response - return await stream.readResponse(ResponseMsg, deadline) - proc init*[MsgType](T: type Responder[MsgType], peer: Peer, conn: Connection): T = T(UntypedResponder(peer: peer, stream: conn)) @@ -546,38 +546,38 @@ proc handleIncomingStream(network: Eth2Node, conn: Connection, handleIncomingPeer(peer) - defer: + try: + let + deadline = sleepAsync RESP_TIMEOUT + msgBytes = await readMsgBytes(conn, false, deadline) + + if msgBytes.len == 0: + await sendErrorResponse(peer, conn, ServerError, readTimeoutErrorMsg) + return + + type MsgRec = RecType(MsgType) + var msg: MsgRec + try: + msg = decode(Format, msgBytes, MsgRec) + except SerializationError as err: + await sendErrorResponse(peer, conn, err, msgName, msgBytes) + return + except Exception as err: + # TODO. This is temporary code that should be removed after interop. + # It can be enabled only in certain diagnostic builds where it should + # re-raise the exception. + debug "Crash during serialization", inputBytes = toHex(msgBytes), msgName + await sendErrorResponse(peer, conn, ServerError, err.msg) + raise err + + try: + logReceivedMsg(peer, MsgType(msg)) + await callUserHandler(peer, conn, msg) + except CatchableError as err: + await sendErrorResponse(peer, conn, ServerError, err.msg) + finally: await safeClose(conn) - let - deadline = sleepAsync RESP_TIMEOUT - msgBytes = await readMsgBytes(conn, false, deadline) - - if msgBytes.len == 0: - await sendErrorResponse(peer, conn, ServerError, readTimeoutErrorMsg) - return - - type MsgRec = RecType(MsgType) - var msg: MsgRec - try: - msg = decode(Format, msgBytes, MsgRec) - except SerializationError as err: - await sendErrorResponse(peer, conn, err, msgName, msgBytes) - return - except Exception as err: - # TODO. This is temporary code that should be removed after interop. - # It can be enabled only in certain diagnostic builds where it should - # re-raise the exception. - debug "Crash during serialization", inputBytes = toHex(msgBytes), msgName - await sendErrorResponse(peer, conn, ServerError, err.msg) - raise err - - try: - logReceivedMsg(peer, MsgType(msg)) - await callUserHandler(peer, conn, msg) - except CatchableError as err: - await sendErrorResponse(peer, conn, ServerError, err.msg) - proc handleOutgoingPeer*(peer: Peer): Future[void] {.async.} = let network = peer.network