From f055fad08af63e7d884eaebb3f2189e157765788 Mon Sep 17 00:00:00 2001 From: Zahary Karadjov Date: Wed, 13 May 2020 01:35:40 +0300 Subject: [PATCH] Make the Snappy FastStreams integration optional by duplicating it for LibP2P streams --- beacon_chain/eth2_network.nim | 252 +++++++++--------------- beacon_chain/faststreams_backend.nim | 118 +++++++++++ beacon_chain/libp2p_streams_backend.nim | 188 ++++++++++++++++++ vendor/nim-chronicles | 2 +- 4 files changed, 398 insertions(+), 162 deletions(-) create mode 100644 beacon_chain/faststreams_backend.nim create mode 100644 beacon_chain/libp2p_streams_backend.nim diff --git a/beacon_chain/eth2_network.nim b/beacon_chain/eth2_network.nim index 4531bf01c..88588dfdc 100644 --- a/beacon_chain/eth2_network.nim +++ b/beacon_chain/eth2_network.nim @@ -168,60 +168,16 @@ declarePublicGauge libp2p_successful_dials, declarePublicGauge libp2p_peers, "Number of active libp2p peers" -type - LibP2PInputStream = ref object of InputStream - conn: Connection - -const - closingErrMsg = "Failed to close LibP2P stream" - readingErrMsg = "Failed to read from LibP2P stream" - proc safeClose(conn: Connection) {.async.} = if not conn.closed: await close(conn) -proc libp2pCloseWait(s: LibP2PInputStream) {.async.} = - fsTranslateErrors closingErrMsg: - await safeClose(s.conn) +const + snappy_implementation {.strdefine.} = "libp2p" -proc libp2pReadOnce(s: LibP2PInputStream, - dst: pointer, dstLen: Natural): Future[Natural] {.async.} = - fsTranslateErrors readingErrMsg: - try: - return implementSingleRead(s.buffers, dst, dstLen, ReadFlags {}, - readStartAddr, readLen): - await s.conn.readOnce(readStartAddr, readLen) - except LPStreamEOFError: - s.buffers.eofReached = true - -# TODO: Use the Raising type here -let libp2pInputVTable = InputStreamVTable( - readSync: proc (s: InputStream, dst: pointer, dstLen: Natural): Natural - {.nimcall, gcsafe, raises: [IOError, Defect].} = - doAssert(false, "synchronous reading is not allowed") - , - readAsync: proc (s: InputStream, dst: pointer, dstLen: Natural): Future[Natural] - {.nimcall, gcsafe, raises: [IOError, Defect].} = - fsTranslateErrors "Unexpected exception from merely forwarding a future": - return libp2pReadOnce(Libp2pInputStream s, dst, dstLen) - , - closeSync: proc (s: InputStream) - {.nimcall, gcsafe, raises: [IOError, Defect].} = - fsTranslateErrors closingErrMsg: - s.closeFut = Libp2pInputStream(s).conn.close() - , - closeAsync: proc (s: InputStream): Future[void] - {.nimcall, gcsafe, raises: [IOError, Defect].} = - fsTranslateErrors "Unexpected exception from merely forwarding a future": - return libp2pCloseWait(Libp2pInputStream s) -) - -func libp2pInput*(conn: Connection, - pageSize = defaultPageSize): AsyncInputStream = - AsyncInputStream LibP2PInputStream( - vtable: vtableAddr libp2pInputVTable, - buffers: initPageBuffers(pageSize), - conn: conn) +const useNativeSnappy = when snappy_implementation == "native": true + elif snappy_implementation == "libp2p": false + else: {.fatal: "Please set snappy_implementation to either 'libp2p' or 'native'".} template libp2pProtocol*(name: string, version: int) {.pragma.} @@ -306,73 +262,6 @@ proc disconnectAndRaise(peer: Peer, await peer.disconnect(r) raisePeerDisconnected(msg, r) -proc readSizePrefix(s: AsyncInputStream, maxSize: uint64): Future[int] {.async.} = - trace "about to read msg size prefix" - var parser: VarintParser[uint64, ProtoBuf] - while s.readable: - case parser.feedByte(s.read) - of Done: - let res = parser.getResult - if res > maxSize: - trace "size prefix outside of range", res - return -1 - else: - trace "got size prefix", res - return int(res) - of Overflow: - trace "size prefix overflow" - return -1 - of Incomplete: - continue - -proc readSszValue(s: AsyncInputStream, MsgType: type): Future[MsgType] {.async.} = - let size = await s.readSizePrefix(uint64(MAX_CHUNK_SIZE)) - if size > 0 and s.readable(size): - s.withReadableRange(size, r): - return r.readValue(SSZ, MsgType) - else: - raise newException(CatchableError, - "Failed to read an incoming message size prefix") - -proc readResponseCode(s: AsyncInputStream): Future[Result[bool, string]] {.async.} = - if s.readable: - let responseCode = s.read - static: assert responseCode.type.low == 0 - if responseCode > ResponseCode.high.byte: - return err("Invalid response code") - - case ResponseCode(responseCode): - of InvalidRequest, ServerError: - return err(await s.readSszValue(string)) - of Success: - return ok true - else: - return ok false - -proc readChunk(s: AsyncInputStream, - MsgType: typedesc): Future[Option[MsgType]] {.async.} = - let rc = await s.readResponseCode() - if rc.isOk: - if rc[]: - return some(await readSszValue(s, MsgType)) - else: - trace "Failed to read response code", - reason = rc.error - -proc readResponse(s: AsyncInputStream, - MsgType: type): Future[Option[MsgType]] {.gcsafe, async.} = - when MsgType is seq: - type E = ElemType(MsgType) - var results: MsgType - while true: - let nextRes = await s.readChunk(E) - if nextRes.isNone: break - results.add nextRes.get - if results.len > 0: - return some(results) - else: - return await s.readChunk(MsgType) - proc encodeErrorMsg(responseCode: ResponseCode, errMsg: string): Bytes = var s = memoryOutput() s.write byte(responseCode) @@ -441,22 +330,31 @@ proc sendResponseChunks[T](responder: UntypedResponder, chunks: seq[T]) {.async. let bytes = s.getOutput await responder.stream.write(bytes) +when useNativeSnappy: + include faststreams_backend +else: + include libp2p_streams_backend + +template awaitWithTimeout[T](operation: Future[T], + deadline: Future[void], + onTimeout: untyped): T = + let f = operation + await f or deadline + if not f.finished: + cancel f + onTimeout + else: + f.read + proc makeEth2Request(peer: Peer, protocolId: string, requestBytes: Bytes, ResponseMsg: type, timeout: Duration): Future[Option[ResponseMsg]] {.gcsafe, async.} = var deadline = sleepAsync timeout protocolId = protocolId & (if peer.supportsSnappy: "ssz_snappy" else: "ssz") - streamFut = peer.network.openStream(peer, protocolId) - - await streamFut or deadline - - if not streamFut.finished: - streamFut.cancel() - return none(ResponseMsg) - - let stream = streamFut.read + let stream = awaitWithTimeout(peer.network.openStream(peer, protocolId), + deadline): return none(ResponseMsg) try: # Send the request var s = memoryOutput() @@ -469,8 +367,11 @@ proc makeEth2Request(peer: Peer, protocolId: string, requestBytes: Bytes, await stream.write(bytes) # Read the response - let response = libp2pInput(stream) - return await response.readResponse(ResponseMsg) + when useNativeSnappy: + return awaitWithTimeout(readResponse(libp2pInput(stream), ResponseMsg), + deadline, none(ResponseMsg)) + else: + return await readResponse(stream, ResponseMsg, deadline) finally: await safeClose(stream) @@ -553,6 +454,8 @@ proc handleIncomingStream(network: Eth2Node, useSnappy: bool, MsgType: type) {.async, gcsafe.} = mixin callUserHandler, RecType + + type MsgRec = RecType(MsgType) const msgName = typetraits.name(MsgType) ## Uncomment this to enable tracing on all incoming requests @@ -566,46 +469,73 @@ proc handleIncomingStream(network: Eth2Node, try: let peer = peerFromStream(network, conn) - let s = libp2pInput(conn) + when useNativeSnappy: + let s = libp2pInput(conn) - if s.timeoutToNextByte(TTFB_TIMEOUT): - await sendErrorResponse(peer, conn, InvalidRequest, - "Request first byte not sent in time") - return + if s.timeoutToNextByte(TTFB_TIMEOUT): + await sendErrorResponse(peer, conn, InvalidRequest, + "Request first byte not sent in time") + return - let deadline = sleepAsync RESP_TIMEOUT + let deadline = sleepAsync RESP_TIMEOUT + + let processingFut = if useSnappy: + s.executePipeline(uncompressFramedStream, + readSszValue MsgRec) + else: + s.readSszValue MsgRec + + try: + await processingFut or deadline + except SerializationError as err: + await sendErrorResponse(peer, conn, InvalidRequest, err.formatMsg("msg")) + return + except SnappyError as err: + await sendErrorResponse(peer, conn, InvalidRequest, err.msg) + return + + if not processingFut.finished: + processingFut.cancel() + await sendErrorResponse(peer, conn, InvalidRequest, + "Request full data not sent in time") + return + + try: + logReceivedMsg(peer, MsgType(processingFut.read)) + await callUserHandler(peer, conn, processingFut.read) + except CatchableError as err: + await sendErrorResponse(peer, conn, ServerError, err.msg) - let processingFut = if useSnappy: - s.executePipeline(uncompressFramedStream, - readSszValue MsgType) else: - s.readSszValue MsgType + let deadline = sleepAsync RESP_TIMEOUT + var msgBytes = await readMsgBytes(conn, false, deadline) - try: - await processingFut or deadline - except SerializationError as err: - await sendErrorResponse(peer, conn, InvalidRequest, err.formatMsg("msg")) - return - except SnappyError as err: - await sendErrorResponse(peer, conn, InvalidRequest, err.msg) - return + if msgBytes.len == 0: + await sendErrorResponse(peer, conn, ServerError, readTimeoutErrorMsg) + return - if not processingFut.finished: - processingFut.cancel() - await sendErrorResponse(peer, conn, InvalidRequest, - "Request full data not sent in time") - return + if useSnappy: + msgBytes = framingFormatUncompress(msgBytes) - if processingFut.error != nil: - await sendErrorResponse(peer, conn, ServerError, - processingFut.error.msg) - return + var msg: MsgRec + try: + msg = decode(SSZ, msgBytes, MsgRec) + except SerializationError as err: + await sendErrorResponse(peer, conn, InvalidRequest, err.formatMsg("msg")) + return + except Exception as err: + # TODO. This is temporary code that should be removed after interop. + # It can be enabled only in certain diagnostic builds where it should + # re-raise the exception. + debug "Crash during serialization", inputBytes = toHex(msgBytes), msgName + await sendErrorResponse(peer, conn, ServerError, err.msg) + raise err - try: - logReceivedMsg(peer, processingFut.read) - await callUserHandler(peer, conn, processingFut.read) - except CatchableError as err: - await sendErrorResponse(peer, conn, ServerError, err.msg) + try: + logReceivedMsg(peer, MsgType(msg)) + await callUserHandler(peer, conn, msg) + except CatchableError as err: + await sendErrorResponse(peer, conn, ServerError, err.msg) except CatchableError as err: debug "Error processing an incoming request", err = err.msg @@ -859,7 +789,7 @@ proc p2pProtocolBackendImpl*(p: P2PProtocol): Backend = proc sszThunk(`streamVar`: `Connection`, proto: string): Future[void] {.gcsafe.} = return handleIncomingStream(`networkVar`, `streamVar`, false, - `MsgRecName`) + `MsgStrongRecName`) mount `networkVar`.switch, LPProtocol(codec: `codecNameLit` & "ssz", @@ -868,7 +798,7 @@ proc p2pProtocolBackendImpl*(p: P2PProtocol): Backend = proc snappyThunk(`streamVar`: `Connection`, proto: string): Future[void] {.gcsafe.} = return handleIncomingStream(`networkVar`, `streamVar`, true, - `MsgRecName`) + `MsgStrongRecName`) mount `networkVar`.switch, LPProtocol(codec: `codecNameLit` & "ssz_snappy", diff --git a/beacon_chain/faststreams_backend.nim b/beacon_chain/faststreams_backend.nim new file mode 100644 index 000000000..3da1957bc --- /dev/null +++ b/beacon_chain/faststreams_backend.nim @@ -0,0 +1,118 @@ +type + LibP2PInputStream = ref object of InputStream + conn: Connection + +const + closingErrMsg = "Failed to close LibP2P stream" + readingErrMsg = "Failed to read from LibP2P stream" + +proc libp2pReadOnce(s: LibP2PInputStream, + dst: pointer, dstLen: Natural): Future[Natural] {.async.} = + fsTranslateErrors readingErrMsg: + try: + return implementSingleRead(s.buffers, dst, dstLen, ReadFlags {}, + readStartAddr, readLen): + await s.conn.readOnce(readStartAddr, readLen) + except LPStreamEOFError: + s.buffers.eofReached = true + +proc libp2pCloseWait(s: LibP2PInputStream) {.async.} = + fsTranslateErrors closingErrMsg: + await safeClose(s.conn) + +# TODO: Use the Raising type here +let libp2pInputVTable = InputStreamVTable( + readSync: proc (s: InputStream, dst: pointer, dstLen: Natural): Natural + {.nimcall, gcsafe, raises: [IOError, Defect].} = + doAssert(false, "synchronous reading is not allowed") + , + readAsync: proc (s: InputStream, dst: pointer, dstLen: Natural): Future[Natural] + {.nimcall, gcsafe, raises: [IOError, Defect].} = + fsTranslateErrors "Unexpected exception from merely forwarding a future": + return libp2pReadOnce(Libp2pInputStream s, dst, dstLen) + , + closeSync: proc (s: InputStream) + {.nimcall, gcsafe, raises: [IOError, Defect].} = + fsTranslateErrors closingErrMsg: + s.closeFut = Libp2pInputStream(s).conn.close() + , + closeAsync: proc (s: InputStream): Future[void] + {.nimcall, gcsafe, raises: [IOError, Defect].} = + fsTranslateErrors "Unexpected exception from merely forwarding a future": + return libp2pCloseWait(Libp2pInputStream s) +) + +func libp2pInput*(conn: Connection, + pageSize = defaultPageSize): AsyncInputStream = + AsyncInputStream LibP2PInputStream( + vtable: vtableAddr libp2pInputVTable, + buffers: initPageBuffers(pageSize), + conn: conn) + +proc readSizePrefix(s: AsyncInputStream, maxSize: uint64): Future[int] {.async.} = + trace "about to read msg size prefix" + var parser: VarintParser[uint64, ProtoBuf] + while s.readable: + case parser.feedByte(s.read) + of Done: + let res = parser.getResult + if res > maxSize: + trace "size prefix outside of range", res + return -1 + else: + trace "got size prefix", res + return int(res) + of Overflow: + trace "size prefix overflow" + return -1 + of Incomplete: + continue + +proc readSszValue(s: AsyncInputStream, MsgType: type): Future[MsgType] {.async.} = + let size = await s.readSizePrefix(uint64(MAX_CHUNK_SIZE)) + if size > 0 and s.readable(size): + s.withReadableRange(size, r): + return r.readValue(SSZ, MsgType) + else: + raise newException(CatchableError, + "Failed to read an incoming message size prefix") + +proc readResponseCode(s: AsyncInputStream): Future[Result[bool, string]] {.async.} = + if s.readable: + let responseCode = s.read + static: assert responseCode.type.low == 0 + if responseCode > ResponseCode.high.byte: + return err("Invalid response code") + + case ResponseCode(responseCode): + of InvalidRequest, ServerError: + return err(await s.readSszValue(string)) + of Success: + return ok true + else: + return ok false + +proc readChunk(s: AsyncInputStream, + MsgType: typedesc): Future[Option[MsgType]] {.async.} = + let rc = await s.readResponseCode() + if rc.isOk: + if rc[]: + return some(await readSszValue(s, MsgType)) + else: + trace "Failed to read response code", + reason = rc.error + +proc readResponse(s: AsyncInputStream, + MsgType: type): Future[Option[MsgType]] {.gcsafe, async.} = + when MsgType is seq: + type E = ElemType(MsgType) + var results: MsgType + while true: + let nextRes = await s.readChunk(E) + if nextRes.isNone: break + results.add nextRes.get + if results.len > 0: + return some(results) + else: + return await s.readChunk(MsgType) + diff --git a/beacon_chain/libp2p_streams_backend.nim b/beacon_chain/libp2p_streams_backend.nim new file mode 100644 index 000000000..efc269f86 --- /dev/null +++ b/beacon_chain/libp2p_streams_backend.nim @@ -0,0 +1,188 @@ +# TODO: How can this be tested? +proc uncompressFramedStream*(conn: Connection, output: OutputStream): Future[Result[void, cstring]] {.async.} = + var header: array[STREAM_HEADER.len, byte] + try: + await conn.readExactly(addr header[0], header.len) + except LPStreamEOFError: + return err "Unexpected EOF before snappy header" + + if header != STREAM_HEADER.toOpenArrayByte(0, STREAM_HEADER.len-1): + return err "Incorrect snappy header" + + var uncompressedData = newSeq[byte](MAX_UNCOMPRESSED_DATA_LEN) + + while true: + var frameHeader: array[4, byte] + try: + await conn.readExactly(addr frameHeader[0], frameHeader.len) + except LPStreamEOFError: + return ok() + + let x = uint32.fromBytesLE frameHeader + let id = x and 0xFF + let dataLen = (x shr 8).int + + if dataLen > MAX_COMPRESSED_DATA_LEN: + return err "invalid snappy frame length" + + var frameData = newSeq[byte](dataLen) + try: + await conn.readExactly(addr frameData[0], dataLen) + except LPStreamEOFError: + return err "Incomplete snappy frame" + + if id == COMPRESSED_DATA_IDENTIFIER: + if dataLen < 4: + return err "Snappy frame size too low to contain CRC checksum" + + let + crc = uint32.fromBytesLE frameData[0..3] + uncompressedLen = snappyUncompress(frameData.toOpenArray(4, frameData.len - 1), uncompressedData) + + if uncompressedLen <= 0: + return err "Failed to decompress snappy frame" + + if not checkCrcAndAppend(output, uncompressedData.toOpenArray(0, uncompressedLen-1), crc): + return err "Snappy content CRC checksum failed" + + elif id == UNCOMPRESSED_DATA_IDENTIFIER: + if dataLen < 4: + return err "Snappy frame size too low to contain CRC checksum" + + let crc = uint32.fromBytesLE frameData[0..3] + if not checkCrcAndAppend(output, frameData.toOpenArray(4, frameData.len - 1), crc): + return err "Snappy content CRC checksum failed" + + elif id < 0x80: + # Reserved unskippable chunks (chunk types 0x02-0x7f) + # if we encounter this type of chunk, stop decoding + # the spec says it is an error + return err "Invalid snappy chunk type" + + else: + # Reserved skippable chunks (chunk types 0x80-0xfe) + # including STREAM_HEADER (0xff) should be skipped + continue + +proc readChunk(conn: Connection, + MsgType: type, + withResponseCode: bool, + deadline: Future[void]): Future[Option[MsgType]] {.gcsafe.} + +proc readSizePrefix(conn: Connection, + deadline: Future[void]): Future[int] {.async.} = + trace "about to read msg size prefix" + var parser: VarintParser[uint64, ProtoBuf] + while true: + var nextByte: byte + var readNextByte = conn.readExactly(addr nextByte, 1) + await readNextByte or deadline + if not readNextByte.finished: + trace "size prefix byte not received in time" + return -1 + case parser.feedByte(nextByte) + of Done: + let res = parser.getResult + if res > uint64(MAX_CHUNK_SIZE): + trace "size prefix outside of range", res + return -1 + else: + trace "got size prefix", res + return int(res) + of Overflow: + trace "size prefix overflow" + return -1 + of Incomplete: + continue + +proc readMsgBytes(conn: Connection, + withResponseCode: bool, + deadline: Future[void]): Future[Bytes] {.async.} = + trace "about to read message bytes", withResponseCode + + try: + if withResponseCode: + var responseCode: byte + trace "about to read response code" + var readResponseCode = conn.readExactly(addr responseCode, 1) + try: + await readResponseCode or deadline + except LPStreamEOFError: + trace "end of stream received" + return + + if not readResponseCode.finished: + trace "response code not received in time" + return + + if responseCode > ResponseCode.high.byte: + trace "invalid response code", responseCode + return + + logScope: responseCode = ResponseCode(responseCode) + trace "got response code" + + case ResponseCode(responseCode) + of InvalidRequest, ServerError: + let responseErrMsg = await conn.readChunk(string, false, deadline) + debug "P2P request resulted in error", responseErrMsg + return + + of Success: + # The response is OK, the execution continues below + discard + + var sizePrefix = await conn.readSizePrefix(deadline) + trace "got msg size prefix", sizePrefix + + if sizePrefix == -1: + debug "Failed to read an incoming message size prefix", peer = conn.peerId + return + + if sizePrefix == 0: + debug "Received SSZ with zero size", peer = conn.peerId + return + + trace "about to read msg bytes", len = sizePrefix + var msgBytes = newSeq[byte](sizePrefix) + var readBody = conn.readExactly(addr msgBytes[0], sizePrefix) + await readBody or deadline + if not readBody.finished: + trace "msg bytes not received in time" + return + + trace "got message bytes", len = sizePrefix + return msgBytes + + except TransportIncompleteError: + return @[] + +proc readChunk(conn: Connection, + MsgType: type, + withResponseCode: bool, + deadline: Future[void]): Future[Option[MsgType]] {.gcsafe, async.} = + var msgBytes = await conn.readMsgBytes(withResponseCode, deadline) + try: + if msgBytes.len > 0: + return some SSZ.decode(msgBytes, MsgType) + except SerializationError as err: + debug "Failed to decode a network message", + msgBytes, errMsg = err.formatMsg("") + return + +proc readResponse( + conn: Connection, + MsgType: type, + deadline: Future[void]): Future[Option[MsgType]] {.gcsafe, async.} = + + when MsgType is seq: + type E = ElemType(MsgType) + var results: MsgType + while true: + let nextRes = await conn.readChunk(E, true, deadline) + if nextRes.isNone: break + results.add nextRes.get + if results.len > 0: + return some(results) + else: + return await conn.readChunk(MsgType, true, deadline) diff --git a/vendor/nim-chronicles b/vendor/nim-chronicles index bb2b58881..229beec0e 160000 --- a/vendor/nim-chronicles +++ b/vendor/nim-chronicles @@ -1 +1 @@ -Subproject commit bb2b58881004481a7d790f4d349d35b29cdbcac9 +Subproject commit 229beec0e8ccf81a6dae1f6519ae0bef6d4bd1b5