From 9538b60704c0efba555e335270d3420bd0fda763 Mon Sep 17 00:00:00 2001 From: Zahary Karadjov Date: Thu, 7 May 2020 01:24:55 +0300 Subject: [PATCH] Integrate the async Snappy implementation --- beacon_chain/eth2_network.nim | 303 +++++++++++++++++----------------- vendor/nim-faststreams | 2 +- vendor/nim-serialization | 2 +- 3 files changed, 154 insertions(+), 153 deletions(-) diff --git a/beacon_chain/eth2_network.nim b/beacon_chain/eth2_network.nim index d4a0da9e3..6cf89d16b 100644 --- a/beacon_chain/eth2_network.nim +++ b/beacon_chain/eth2_network.nim @@ -5,7 +5,7 @@ import # Status libs stew/[varints, base58, bitseqs], stew/shims/[macros, tables], stint, - faststreams/outputs, snappy, snappy/framing, + faststreams/[inputs, outputs, buffers], snappy, snappy/framing, json_serialization, json_serialization/std/[net, options], chronos, chronicles, metrics, # TODO: create simpler to use libp2p modules that use re-exports @@ -168,6 +168,61 @@ declarePublicGauge libp2p_successful_dials, declarePublicGauge libp2p_peers, "Number of active libp2p peers" +type + LibP2PInputStream = ref object of InputStream + conn: Connection + +const + closingErrMsg = "Failed to close LibP2P stream" + readingErrMsg = "Failed to read from LibP2P stream" + +proc safeClose(conn: Connection) {.async.} = + if not conn.closed: + await close(conn) + +proc libp2pCloseWait(s: LibP2PInputStream) {.async.} = + fsTranslateErrors closingErrMsg: + await safeClose(s.conn) + +proc libp2pReadOnce(s: LibP2PInputStream, + dst: pointer, dstLen: Natural): Future[Natural] {.async.} = + fsTranslateErrors readingErrMsg: + try: + return implementSingleRead(s.buffers, dst, dstLen, ReadFlags {}, + readStartAddr, readLen): + await s.conn.readOnce(readStartAddr, readLen) + except LPStreamEOFError: + s.buffers.eofReached = true + +# TODO: Use the Raising type here +let libp2pInputVTable = InputStreamVTable( + readSync: proc (s: InputStream, dst: pointer, dstLen: Natural): Natural + {.nimcall, gcsafe, raises: [IOError, Defect].} = + doAssert(false, "synchronous reading is not allowed") + , + readAsync: proc (s: InputStream, dst: pointer, dstLen: Natural): Future[Natural] + {.nimcall, gcsafe, raises: [IOError, Defect].} = + fsTranslateErrors "Unexpected exception from merely forwarding a future": + return libp2pReadOnce(Libp2pInputStream s, dst, dstLen) + , + closeSync: proc (s: InputStream) + {.nimcall, gcsafe, raises: [IOError, Defect].} = + fsTranslateErrors closingErrMsg: + s.closeFut = Libp2pInputStream(s).conn.close() + , + closeAsync: proc (s: InputStream): Future[void] + {.nimcall, gcsafe, raises: [IOError, Defect].} = + fsTranslateErrors "Unexpected exception from merely forwarding a future": + return libp2pCloseWait(Libp2pInputStream s) +) + +func libp2pInput*(conn: Connection, + pageSize = defaultPageSize): AsyncInputStream = + AsyncInputStream LibP2PInputStream( + vtable: vtableAddr libp2pInputVTable, + buffers: initPageBuffers(pageSize), + conn: conn) + template libp2pProtocol*(name: string, version: int) {.pragma.} template `$`*(peer: Peer): string = id(peer.info) @@ -222,10 +277,6 @@ proc disconnect*(peer: Peer, reason: DisconnectionReason, peer.network.peerPool.release(peer) peer.info.close() -proc safeClose(conn: Connection) {.async.} = - if not conn.closed: - await close(conn) - include eth/p2p/p2p_backends_helpers include eth/p2p/p2p_tracing @@ -255,23 +306,11 @@ proc disconnectAndRaise(peer: Peer, await peer.disconnect(r) raisePeerDisconnected(msg, r) -proc readChunk(conn: Connection, - MsgType: type, - withResponseCode: bool, - deadline: Future[void]): Future[Option[MsgType]] {.gcsafe.} - -proc readSizePrefix(conn: Connection, - deadline: Future[void]): Future[int] {.async.} = +proc readSizePrefix(s: AsyncInputStream): Future[int] {.async.} = trace "about to read msg size prefix" var parser: VarintParser[uint64, ProtoBuf] - while true: - var nextByte: byte - var readNextByte = conn.readExactly(addr nextByte, 1) - await readNextByte or deadline - if not readNextByte.finished: - trace "size prefix byte not received in time" - return -1 - case parser.feedByte(nextByte) + while s.readable: + case parser.feedByte(s.read) of Done: let res = parser.getResult if res > uint64(MAX_CHUNK_SIZE): @@ -286,117 +325,61 @@ proc readSizePrefix(conn: Connection, of Incomplete: continue -proc readMsgBytes(conn: Connection, - withResponseCode: bool, - deadline: Future[void]): Future[Bytes] {.async.} = - trace "about to read message bytes", withResponseCode +proc readSszValue(s: AsyncInputStream, MsgType: type): Future[MsgType] {.async.} = + let size = await s.readSizePrefix + if size > 0 and s.readable(size): + s.nonBlockingReads(ss): + return ss.readValue(SSZ, MsgType) + else: + raise newException(CatchableError, + "Failed to read an incoming message size prefix") - try: - if withResponseCode: - var responseCode: byte - trace "about to read response code" - var readResponseCode = conn.readExactly(addr responseCode, 1) - try: - await readResponseCode or deadline - except LPStreamEOFError: - trace "end of stream received" - return +proc readResponseCode(s: AsyncInputStream): Future[Result[bool, string]] {.async.} = + if s.readable: + let responseCode = s.read + static: assert responseCode.type.low == 0 + if responseCode > ResponseCode.high.byte: + return err("Invalid response code") - if not readResponseCode.finished: - trace "response code not received in time" - return + case ResponseCode(responseCode): + of InvalidRequest, ServerError: + return err(await s.readSszValue(string)) + of Success: + return ok true + else: + return ok false - if responseCode > ResponseCode.high.byte: - trace "invalid response code", responseCode - return - - logScope: responseCode = ResponseCode(responseCode) - trace "got response code" - - case ResponseCode(responseCode) - of InvalidRequest, ServerError: - let responseErrMsg = await conn.readChunk(string, false, deadline) - debug "P2P request resulted in error", responseErrMsg - return - - of Success: - # The response is OK, the execution continues below - discard - - var sizePrefix = await conn.readSizePrefix(deadline) - trace "got msg size prefix", sizePrefix - - if sizePrefix == -1: - debug "Failed to read an incoming message size prefix", peer = conn.peerId - return - - if sizePrefix == 0: - debug "Received SSZ with zero size", peer = conn.peerId - return - - trace "about to read msg bytes", len = sizePrefix - var msgBytes = newSeq[byte](sizePrefix) - var readBody = conn.readExactly(addr msgBytes[0], sizePrefix) - await readBody or deadline - if not readBody.finished: - trace "msg bytes not received in time" - return - - trace "got message bytes", len = sizePrefix - return msgBytes - - except TransportIncompleteError: - return @[] - -proc readChunk(conn: Connection, - MsgType: type, - withResponseCode: bool, - deadline: Future[void]): Future[Option[MsgType]] {.gcsafe, async.} = - var msgBytes = await conn.readMsgBytes(withResponseCode, deadline) - try: - if msgBytes.len > 0: - return some SSZ.decode(msgBytes, MsgType) - except SerializationError as err: - debug "Failed to decode a network message", - msgBytes, errMsg = err.formatMsg("") - return - -proc readResponse( - conn: Connection, - MsgType: type, - deadline: Future[void]): Future[Option[MsgType]] {.gcsafe, async.} = +proc readChunk(s: AsyncInputStream, + MsgType: typedesc): Future[Option[MsgType]] {.async.} = + let rc = await s.readResponseCode() + if rc.isOk: + if rc[]: + return some(await readSszValue(s, MsgType)) + else: + trace "Failed to read response code", + reason = rc.error +proc readResponse(s: AsyncInputStream, + MsgType: type): Future[Option[MsgType]] {.gcsafe, async.} = when MsgType is seq: type E = ElemType(MsgType) var results: MsgType while true: - let nextRes = await conn.readChunk(E, true, deadline) + let nextRes = await s.readChunk(E) if nextRes.isNone: break results.add nextRes.get if results.len > 0: return some(results) else: - return await conn.readChunk(MsgType, true, deadline) + return await s.readChunk(MsgType) proc encodeErrorMsg(responseCode: ResponseCode, errMsg: string): Bytes = var s = memoryOutput() - s.append byte(responseCode) - s.appendVarint errMsg.len - s.appendValue SSZ, errMsg + s.write byte(responseCode) + s.writeVarint errMsg.len + s.writeValue SSZ, errMsg s.getOutput -proc sendErrorResponse(peer: Peer, - conn: Connection, - err: ref SerializationError, - msgName: string, - msgBytes: Bytes) {.async.} = - debug "Received an invalid request", - peer, msgName, msgBytes, errMsg = err.formatMsg("") - - let responseBytes = encodeErrorMsg(InvalidRequest, err.formatMsg("msg")) - await conn.write(responseBytes) - await conn.close() - proc sendErrorResponse(peer: Peer, conn: Connection, responseCode: ResponseCode, @@ -422,11 +405,11 @@ proc sendNotificationMsg(peer: Peer, protocolId: string, requestBytes: Bytes) {. let stream = streamFut.read try: var s = memoryOutput() - s.appendVarint requestBytes.len.uint64 + s.writeVarint requestBytes.len.uint64 if peer.supportsSnappy: framing_format_compress(s, requestBytes) else: - s.append requestBytes + s.write requestBytes let bytes = s.getOutput await stream.write(bytes) finally: @@ -436,24 +419,24 @@ proc sendNotificationMsg(peer: Peer, protocolId: string, requestBytes: Bytes) {. # I hope to reduce this when I increse the reliance on output streams. proc sendResponseChunkBytes(responder: UntypedResponder, payload: Bytes) {.async.} = var s = memoryOutput() - s.append byte(Success) - s.appendVarint payload.len.uint64 - s.append payload + s.write byte(Success) + s.writeVarint payload.len.uint64 + s.write payload let bytes = s.getOutput await responder.stream.write(bytes) proc sendResponseChunkObj(responder: UntypedResponder, val: auto) {.async.} = var s = memoryOutput() - s.append byte(Success) - s.appendValue SSZ, sizePrefixed(val) + s.write byte(Success) + s.writeValue SSZ, sizePrefixed(val) let bytes = s.getOutput await responder.stream.write(bytes) proc sendResponseChunks[T](responder: UntypedResponder, chunks: seq[T]) {.async.} = var s = memoryOutput() for chunk in chunks: - s.append byte(Success) - s.appendValue SSZ, sizePrefixed(chunk) + s.write byte(Success) + s.writeValue SSZ, sizePrefixed(chunk) let bytes = s.getOutput await responder.stream.write(bytes) @@ -473,19 +456,21 @@ proc makeEth2Request(peer: Peer, protocolId: string, requestBytes: Bytes, return none(ResponseMsg) let stream = streamFut.read + try: # Send the request var s = memoryOutput() - s.appendVarint requestBytes.len.uint64 + s.writeVarint requestBytes.len.uint64 if peer.supportsSnappy: framing_format_compress(s, requestBytes) else: - s.append requestBytes + s.write requestBytes let bytes = s.getOutput await stream.write(bytes) # Read the response - return await stream.readResponse(ResponseMsg, deadline) + let response = libp2pInput(stream) + return await response.readResponse(ResponseMsg) finally: await safeClose(stream) @@ -563,8 +548,10 @@ proc implementSendProcBody(sendProc: SendProc) = sendProc.useStandardBody(nil, nil, sendCallGenerator) -proc handleIncomingStream(network: Eth2Node, conn: Connection, useSnappy: bool, - MsgType, Format: distinct type) {.async, gcsafe.} = +proc handleIncomingStream(network: Eth2Node, + conn: Connection, + useSnappy: bool, + MsgType: type) {.async, gcsafe.} = mixin callUserHandler, RecType const msgName = typetraits.name(MsgType) @@ -576,39 +563,53 @@ proc handleIncomingStream(network: Eth2Node, conn: Connection, useSnappy: bool, # defer: setLogLevel(LogLevel.DEBUG) # trace "incoming " & `msgNameLit` & " conn" - let peer = peerFromStream(network, conn) - try: + let peer = peerFromStream(network, conn) + + let s = libp2pInput(conn) + + if s.timeoutToNextByte(TTFB_TIMEOUT): + await sendErrorResponse(peer, conn, InvalidRequest, + "Request first byte not sent in time") + return + let deadline = sleepAsync RESP_TIMEOUT - var msgBytes = await readMsgBytes(conn, false, deadline) - if msgBytes.len == 0: - await sendErrorResponse(peer, conn, ServerError, readTimeoutErrorMsg) - return + let processingFut = if useSnappy: + s.executePipeline(uncompressFramedStream, + readSszValue MsgType) + else: + s.readSszValue MsgType - if useSnappy: - msgBytes = framingFormatUncompress(msgBytes) - - type MsgRec = RecType(MsgType) - var msg: MsgRec try: - msg = decode(Format, msgBytes, MsgRec) + await processingFut or deadline except SerializationError as err: - await sendErrorResponse(peer, conn, err, msgName, msgBytes) + await sendErrorResponse(peer, conn, InvalidRequest, err.formatMsg("msg")) + return + except SnappyError as err: + await sendErrorResponse(peer, conn, InvalidRequest, err.msg) + return + + if not processingFut.finished: + processingFut.cancel() + await sendErrorResponse(peer, conn, InvalidRequest, + "Request full data not sent in time") + return + + if processingFut.error != nil: + await sendErrorResponse(peer, conn, ServerError, + processingFut.error.msg) return - except Exception as err: - # TODO. This is temporary code that should be removed after interop. - # It can be enabled only in certain diagnostic builds where it should - # re-raise the exception. - debug "Crash during serialization", inputBytes = toHex(msgBytes), msgName - await sendErrorResponse(peer, conn, ServerError, err.msg) - raise err try: - logReceivedMsg(peer, MsgType(msg)) - await callUserHandler(peer, conn, msg) + logReceivedMsg(peer, processingFut.read) + await callUserHandler(peer, conn, processingFut.read) except CatchableError as err: await sendErrorResponse(peer, conn, ServerError, err.msg) + + except CatchableError as err: + debug "Error processing an incoming request", err = err.msg + finally: await safeClose(conn) @@ -858,7 +859,7 @@ proc p2pProtocolBackendImpl*(p: P2PProtocol): Backend = proc sszThunk(`streamVar`: `Connection`, proto: string): Future[void] {.gcsafe.} = return handleIncomingStream(`networkVar`, `streamVar`, false, - `MsgStrongRecName`, `Format`) + `MsgRecName`) mount `networkVar`.switch, LPProtocol(codec: `codecNameLit` & "ssz", @@ -867,7 +868,7 @@ proc p2pProtocolBackendImpl*(p: P2PProtocol): Backend = proc snappyThunk(`streamVar`: `Connection`, proto: string): Future[void] {.gcsafe.} = return handleIncomingStream(`networkVar`, `streamVar`, true, - `MsgStrongRecName`, `Format`) + `MsgRecName`) mount `networkVar`.switch, LPProtocol(codec: `codecNameLit` & "ssz_snappy", diff --git a/vendor/nim-faststreams b/vendor/nim-faststreams index e9f50bc84..8b863f779 160000 --- a/vendor/nim-faststreams +++ b/vendor/nim-faststreams @@ -1 +1 @@ -Subproject commit e9f50bc8478b261c734f704d3aac3853bcbe8a30 +Subproject commit 8b863f7798f7e1a6a9266b88e7bca5aa0cb5ed8a diff --git a/vendor/nim-serialization b/vendor/nim-serialization index cef0a2b4d..1c0e0adf4 160000 --- a/vendor/nim-serialization +++ b/vendor/nim-serialization @@ -1 +1 @@ -Subproject commit cef0a2b4d247bdd9038364f0e41230edf42706d6 +Subproject commit 1c0e0adf459932e45b2a7483f704086cee7c906a