Make the Snappy FastStreams integration optional by duplicating it for LibP2P streams

This commit is contained in:
Zahary Karadjov 2020-05-13 01:35:40 +03:00 committed by zah
parent 3ecb197635
commit f055fad08a
4 changed files with 398 additions and 162 deletions

View File

@ -168,60 +168,16 @@ declarePublicGauge libp2p_successful_dials,
declarePublicGauge libp2p_peers, declarePublicGauge libp2p_peers,
"Number of active libp2p peers" "Number of active libp2p peers"
type
LibP2PInputStream = ref object of InputStream
conn: Connection
const
closingErrMsg = "Failed to close LibP2P stream"
readingErrMsg = "Failed to read from LibP2P stream"
proc safeClose(conn: Connection) {.async.} = proc safeClose(conn: Connection) {.async.} =
if not conn.closed: if not conn.closed:
await close(conn) await close(conn)
proc libp2pCloseWait(s: LibP2PInputStream) {.async.} = const
fsTranslateErrors closingErrMsg: snappy_implementation {.strdefine.} = "libp2p"
await safeClose(s.conn)
proc libp2pReadOnce(s: LibP2PInputStream, const useNativeSnappy = when snappy_implementation == "native": true
dst: pointer, dstLen: Natural): Future[Natural] {.async.} = elif snappy_implementation == "libp2p": false
fsTranslateErrors readingErrMsg: else: {.fatal: "Please set snappy_implementation to either 'libp2p' or 'native'".}
try:
return implementSingleRead(s.buffers, dst, dstLen, ReadFlags {},
readStartAddr, readLen):
await s.conn.readOnce(readStartAddr, readLen)
except LPStreamEOFError:
s.buffers.eofReached = true
# TODO: Use the Raising type here
let libp2pInputVTable = InputStreamVTable(
readSync: proc (s: InputStream, dst: pointer, dstLen: Natural): Natural
{.nimcall, gcsafe, raises: [IOError, Defect].} =
doAssert(false, "synchronous reading is not allowed")
,
readAsync: proc (s: InputStream, dst: pointer, dstLen: Natural): Future[Natural]
{.nimcall, gcsafe, raises: [IOError, Defect].} =
fsTranslateErrors "Unexpected exception from merely forwarding a future":
return libp2pReadOnce(Libp2pInputStream s, dst, dstLen)
,
closeSync: proc (s: InputStream)
{.nimcall, gcsafe, raises: [IOError, Defect].} =
fsTranslateErrors closingErrMsg:
s.closeFut = Libp2pInputStream(s).conn.close()
,
closeAsync: proc (s: InputStream): Future[void]
{.nimcall, gcsafe, raises: [IOError, Defect].} =
fsTranslateErrors "Unexpected exception from merely forwarding a future":
return libp2pCloseWait(Libp2pInputStream s)
)
func libp2pInput*(conn: Connection,
pageSize = defaultPageSize): AsyncInputStream =
AsyncInputStream LibP2PInputStream(
vtable: vtableAddr libp2pInputVTable,
buffers: initPageBuffers(pageSize),
conn: conn)
template libp2pProtocol*(name: string, version: int) {.pragma.} template libp2pProtocol*(name: string, version: int) {.pragma.}
@ -306,73 +262,6 @@ proc disconnectAndRaise(peer: Peer,
await peer.disconnect(r) await peer.disconnect(r)
raisePeerDisconnected(msg, r) raisePeerDisconnected(msg, r)
proc readSizePrefix(s: AsyncInputStream, maxSize: uint64): Future[int] {.async.} =
trace "about to read msg size prefix"
var parser: VarintParser[uint64, ProtoBuf]
while s.readable:
case parser.feedByte(s.read)
of Done:
let res = parser.getResult
if res > maxSize:
trace "size prefix outside of range", res
return -1
else:
trace "got size prefix", res
return int(res)
of Overflow:
trace "size prefix overflow"
return -1
of Incomplete:
continue
proc readSszValue(s: AsyncInputStream, MsgType: type): Future[MsgType] {.async.} =
let size = await s.readSizePrefix(uint64(MAX_CHUNK_SIZE))
if size > 0 and s.readable(size):
s.withReadableRange(size, r):
return r.readValue(SSZ, MsgType)
else:
raise newException(CatchableError,
"Failed to read an incoming message size prefix")
proc readResponseCode(s: AsyncInputStream): Future[Result[bool, string]] {.async.} =
if s.readable:
let responseCode = s.read
static: assert responseCode.type.low == 0
if responseCode > ResponseCode.high.byte:
return err("Invalid response code")
case ResponseCode(responseCode):
of InvalidRequest, ServerError:
return err(await s.readSszValue(string))
of Success:
return ok true
else:
return ok false
proc readChunk(s: AsyncInputStream,
MsgType: typedesc): Future[Option[MsgType]] {.async.} =
let rc = await s.readResponseCode()
if rc.isOk:
if rc[]:
return some(await readSszValue(s, MsgType))
else:
trace "Failed to read response code",
reason = rc.error
proc readResponse(s: AsyncInputStream,
MsgType: type): Future[Option[MsgType]] {.gcsafe, async.} =
when MsgType is seq:
type E = ElemType(MsgType)
var results: MsgType
while true:
let nextRes = await s.readChunk(E)
if nextRes.isNone: break
results.add nextRes.get
if results.len > 0:
return some(results)
else:
return await s.readChunk(MsgType)
proc encodeErrorMsg(responseCode: ResponseCode, errMsg: string): Bytes = proc encodeErrorMsg(responseCode: ResponseCode, errMsg: string): Bytes =
var s = memoryOutput() var s = memoryOutput()
s.write byte(responseCode) s.write byte(responseCode)
@ -441,22 +330,31 @@ proc sendResponseChunks[T](responder: UntypedResponder, chunks: seq[T]) {.async.
let bytes = s.getOutput let bytes = s.getOutput
await responder.stream.write(bytes) await responder.stream.write(bytes)
when useNativeSnappy:
include faststreams_backend
else:
include libp2p_streams_backend
template awaitWithTimeout[T](operation: Future[T],
deadline: Future[void],
onTimeout: untyped): T =
let f = operation
await f or deadline
if not f.finished:
cancel f
onTimeout
else:
f.read
proc makeEth2Request(peer: Peer, protocolId: string, requestBytes: Bytes, proc makeEth2Request(peer: Peer, protocolId: string, requestBytes: Bytes,
ResponseMsg: type, ResponseMsg: type,
timeout: Duration): Future[Option[ResponseMsg]] {.gcsafe, async.} = timeout: Duration): Future[Option[ResponseMsg]] {.gcsafe, async.} =
var var
deadline = sleepAsync timeout deadline = sleepAsync timeout
protocolId = protocolId & (if peer.supportsSnappy: "ssz_snappy" else: "ssz") protocolId = protocolId & (if peer.supportsSnappy: "ssz_snappy" else: "ssz")
streamFut = peer.network.openStream(peer, protocolId)
await streamFut or deadline
if not streamFut.finished:
streamFut.cancel()
return none(ResponseMsg)
let stream = streamFut.read
let stream = awaitWithTimeout(peer.network.openStream(peer, protocolId),
deadline): return none(ResponseMsg)
try: try:
# Send the request # Send the request
var s = memoryOutput() var s = memoryOutput()
@ -469,8 +367,11 @@ proc makeEth2Request(peer: Peer, protocolId: string, requestBytes: Bytes,
await stream.write(bytes) await stream.write(bytes)
# Read the response # Read the response
let response = libp2pInput(stream) when useNativeSnappy:
return await response.readResponse(ResponseMsg) return awaitWithTimeout(readResponse(libp2pInput(stream), ResponseMsg),
deadline, none(ResponseMsg))
else:
return await readResponse(stream, ResponseMsg, deadline)
finally: finally:
await safeClose(stream) await safeClose(stream)
@ -553,6 +454,8 @@ proc handleIncomingStream(network: Eth2Node,
useSnappy: bool, useSnappy: bool,
MsgType: type) {.async, gcsafe.} = MsgType: type) {.async, gcsafe.} =
mixin callUserHandler, RecType mixin callUserHandler, RecType
type MsgRec = RecType(MsgType)
const msgName = typetraits.name(MsgType) const msgName = typetraits.name(MsgType)
## Uncomment this to enable tracing on all incoming requests ## Uncomment this to enable tracing on all incoming requests
@ -566,46 +469,73 @@ proc handleIncomingStream(network: Eth2Node,
try: try:
let peer = peerFromStream(network, conn) let peer = peerFromStream(network, conn)
let s = libp2pInput(conn) when useNativeSnappy:
let s = libp2pInput(conn)
if s.timeoutToNextByte(TTFB_TIMEOUT): if s.timeoutToNextByte(TTFB_TIMEOUT):
await sendErrorResponse(peer, conn, InvalidRequest, await sendErrorResponse(peer, conn, InvalidRequest,
"Request first byte not sent in time") "Request first byte not sent in time")
return return
let deadline = sleepAsync RESP_TIMEOUT let deadline = sleepAsync RESP_TIMEOUT
let processingFut = if useSnappy:
s.executePipeline(uncompressFramedStream,
readSszValue MsgRec)
else:
s.readSszValue MsgRec
try:
await processingFut or deadline
except SerializationError as err:
await sendErrorResponse(peer, conn, InvalidRequest, err.formatMsg("msg"))
return
except SnappyError as err:
await sendErrorResponse(peer, conn, InvalidRequest, err.msg)
return
if not processingFut.finished:
processingFut.cancel()
await sendErrorResponse(peer, conn, InvalidRequest,
"Request full data not sent in time")
return
try:
logReceivedMsg(peer, MsgType(processingFut.read))
await callUserHandler(peer, conn, processingFut.read)
except CatchableError as err:
await sendErrorResponse(peer, conn, ServerError, err.msg)
let processingFut = if useSnappy:
s.executePipeline(uncompressFramedStream,
readSszValue MsgType)
else: else:
s.readSszValue MsgType let deadline = sleepAsync RESP_TIMEOUT
var msgBytes = await readMsgBytes(conn, false, deadline)
try: if msgBytes.len == 0:
await processingFut or deadline await sendErrorResponse(peer, conn, ServerError, readTimeoutErrorMsg)
except SerializationError as err: return
await sendErrorResponse(peer, conn, InvalidRequest, err.formatMsg("msg"))
return
except SnappyError as err:
await sendErrorResponse(peer, conn, InvalidRequest, err.msg)
return
if not processingFut.finished: if useSnappy:
processingFut.cancel() msgBytes = framingFormatUncompress(msgBytes)
await sendErrorResponse(peer, conn, InvalidRequest,
"Request full data not sent in time")
return
if processingFut.error != nil: var msg: MsgRec
await sendErrorResponse(peer, conn, ServerError, try:
processingFut.error.msg) msg = decode(SSZ, msgBytes, MsgRec)
return except SerializationError as err:
await sendErrorResponse(peer, conn, InvalidRequest, err.formatMsg("msg"))
return
except Exception as err:
# TODO. This is temporary code that should be removed after interop.
# It can be enabled only in certain diagnostic builds where it should
# re-raise the exception.
debug "Crash during serialization", inputBytes = toHex(msgBytes), msgName
await sendErrorResponse(peer, conn, ServerError, err.msg)
raise err
try: try:
logReceivedMsg(peer, processingFut.read) logReceivedMsg(peer, MsgType(msg))
await callUserHandler(peer, conn, processingFut.read) await callUserHandler(peer, conn, msg)
except CatchableError as err: except CatchableError as err:
await sendErrorResponse(peer, conn, ServerError, err.msg) await sendErrorResponse(peer, conn, ServerError, err.msg)
except CatchableError as err: except CatchableError as err:
debug "Error processing an incoming request", err = err.msg debug "Error processing an incoming request", err = err.msg
@ -859,7 +789,7 @@ proc p2pProtocolBackendImpl*(p: P2PProtocol): Backend =
proc sszThunk(`streamVar`: `Connection`, proc sszThunk(`streamVar`: `Connection`,
proto: string): Future[void] {.gcsafe.} = proto: string): Future[void] {.gcsafe.} =
return handleIncomingStream(`networkVar`, `streamVar`, false, return handleIncomingStream(`networkVar`, `streamVar`, false,
`MsgRecName`) `MsgStrongRecName`)
mount `networkVar`.switch, mount `networkVar`.switch,
LPProtocol(codec: `codecNameLit` & "ssz", LPProtocol(codec: `codecNameLit` & "ssz",
@ -868,7 +798,7 @@ proc p2pProtocolBackendImpl*(p: P2PProtocol): Backend =
proc snappyThunk(`streamVar`: `Connection`, proc snappyThunk(`streamVar`: `Connection`,
proto: string): Future[void] {.gcsafe.} = proto: string): Future[void] {.gcsafe.} =
return handleIncomingStream(`networkVar`, `streamVar`, true, return handleIncomingStream(`networkVar`, `streamVar`, true,
`MsgRecName`) `MsgStrongRecName`)
mount `networkVar`.switch, mount `networkVar`.switch,
LPProtocol(codec: `codecNameLit` & "ssz_snappy", LPProtocol(codec: `codecNameLit` & "ssz_snappy",

View File

@ -0,0 +1,118 @@
type
LibP2PInputStream = ref object of InputStream
conn: Connection
const
closingErrMsg = "Failed to close LibP2P stream"
readingErrMsg = "Failed to read from LibP2P stream"
proc libp2pReadOnce(s: LibP2PInputStream,
dst: pointer, dstLen: Natural): Future[Natural] {.async.} =
fsTranslateErrors readingErrMsg:
try:
return implementSingleRead(s.buffers, dst, dstLen, ReadFlags {},
readStartAddr, readLen):
await s.conn.readOnce(readStartAddr, readLen)
except LPStreamEOFError:
s.buffers.eofReached = true
proc libp2pCloseWait(s: LibP2PInputStream) {.async.} =
fsTranslateErrors closingErrMsg:
await safeClose(s.conn)
# TODO: Use the Raising type here
let libp2pInputVTable = InputStreamVTable(
readSync: proc (s: InputStream, dst: pointer, dstLen: Natural): Natural
{.nimcall, gcsafe, raises: [IOError, Defect].} =
doAssert(false, "synchronous reading is not allowed")
,
readAsync: proc (s: InputStream, dst: pointer, dstLen: Natural): Future[Natural]
{.nimcall, gcsafe, raises: [IOError, Defect].} =
fsTranslateErrors "Unexpected exception from merely forwarding a future":
return libp2pReadOnce(Libp2pInputStream s, dst, dstLen)
,
closeSync: proc (s: InputStream)
{.nimcall, gcsafe, raises: [IOError, Defect].} =
fsTranslateErrors closingErrMsg:
s.closeFut = Libp2pInputStream(s).conn.close()
,
closeAsync: proc (s: InputStream): Future[void]
{.nimcall, gcsafe, raises: [IOError, Defect].} =
fsTranslateErrors "Unexpected exception from merely forwarding a future":
return libp2pCloseWait(Libp2pInputStream s)
)
func libp2pInput*(conn: Connection,
pageSize = defaultPageSize): AsyncInputStream =
AsyncInputStream LibP2PInputStream(
vtable: vtableAddr libp2pInputVTable,
buffers: initPageBuffers(pageSize),
conn: conn)
proc readSizePrefix(s: AsyncInputStream, maxSize: uint64): Future[int] {.async.} =
trace "about to read msg size prefix"
var parser: VarintParser[uint64, ProtoBuf]
while s.readable:
case parser.feedByte(s.read)
of Done:
let res = parser.getResult
if res > maxSize:
trace "size prefix outside of range", res
return -1
else:
trace "got size prefix", res
return int(res)
of Overflow:
trace "size prefix overflow"
return -1
of Incomplete:
continue
proc readSszValue(s: AsyncInputStream, MsgType: type): Future[MsgType] {.async.} =
let size = await s.readSizePrefix(uint64(MAX_CHUNK_SIZE))
if size > 0 and s.readable(size):
s.withReadableRange(size, r):
return r.readValue(SSZ, MsgType)
else:
raise newException(CatchableError,
"Failed to read an incoming message size prefix")
proc readResponseCode(s: AsyncInputStream): Future[Result[bool, string]] {.async.} =
if s.readable:
let responseCode = s.read
static: assert responseCode.type.low == 0
if responseCode > ResponseCode.high.byte:
return err("Invalid response code")
case ResponseCode(responseCode):
of InvalidRequest, ServerError:
return err(await s.readSszValue(string))
of Success:
return ok true
else:
return ok false
proc readChunk(s: AsyncInputStream,
MsgType: typedesc): Future[Option[MsgType]] {.async.} =
let rc = await s.readResponseCode()
if rc.isOk:
if rc[]:
return some(await readSszValue(s, MsgType))
else:
trace "Failed to read response code",
reason = rc.error
proc readResponse(s: AsyncInputStream,
MsgType: type): Future[Option[MsgType]] {.gcsafe, async.} =
when MsgType is seq:
type E = ElemType(MsgType)
var results: MsgType
while true:
let nextRes = await s.readChunk(E)
if nextRes.isNone: break
results.add nextRes.get
if results.len > 0:
return some(results)
else:
return await s.readChunk(MsgType)

View File

@ -0,0 +1,188 @@
# TODO: How can this be tested?
proc uncompressFramedStream*(conn: Connection, output: OutputStream): Future[Result[void, cstring]] {.async.} =
var header: array[STREAM_HEADER.len, byte]
try:
await conn.readExactly(addr header[0], header.len)
except LPStreamEOFError:
return err "Unexpected EOF before snappy header"
if header != STREAM_HEADER.toOpenArrayByte(0, STREAM_HEADER.len-1):
return err "Incorrect snappy header"
var uncompressedData = newSeq[byte](MAX_UNCOMPRESSED_DATA_LEN)
while true:
var frameHeader: array[4, byte]
try:
await conn.readExactly(addr frameHeader[0], frameHeader.len)
except LPStreamEOFError:
return ok()
let x = uint32.fromBytesLE frameHeader
let id = x and 0xFF
let dataLen = (x shr 8).int
if dataLen > MAX_COMPRESSED_DATA_LEN:
return err "invalid snappy frame length"
var frameData = newSeq[byte](dataLen)
try:
await conn.readExactly(addr frameData[0], dataLen)
except LPStreamEOFError:
return err "Incomplete snappy frame"
if id == COMPRESSED_DATA_IDENTIFIER:
if dataLen < 4:
return err "Snappy frame size too low to contain CRC checksum"
let
crc = uint32.fromBytesLE frameData[0..3]
uncompressedLen = snappyUncompress(frameData.toOpenArray(4, frameData.len - 1), uncompressedData)
if uncompressedLen <= 0:
return err "Failed to decompress snappy frame"
if not checkCrcAndAppend(output, uncompressedData.toOpenArray(0, uncompressedLen-1), crc):
return err "Snappy content CRC checksum failed"
elif id == UNCOMPRESSED_DATA_IDENTIFIER:
if dataLen < 4:
return err "Snappy frame size too low to contain CRC checksum"
let crc = uint32.fromBytesLE frameData[0..3]
if not checkCrcAndAppend(output, frameData.toOpenArray(4, frameData.len - 1), crc):
return err "Snappy content CRC checksum failed"
elif id < 0x80:
# Reserved unskippable chunks (chunk types 0x02-0x7f)
# if we encounter this type of chunk, stop decoding
# the spec says it is an error
return err "Invalid snappy chunk type"
else:
# Reserved skippable chunks (chunk types 0x80-0xfe)
# including STREAM_HEADER (0xff) should be skipped
continue
proc readChunk(conn: Connection,
MsgType: type,
withResponseCode: bool,
deadline: Future[void]): Future[Option[MsgType]] {.gcsafe.}
proc readSizePrefix(conn: Connection,
deadline: Future[void]): Future[int] {.async.} =
trace "about to read msg size prefix"
var parser: VarintParser[uint64, ProtoBuf]
while true:
var nextByte: byte
var readNextByte = conn.readExactly(addr nextByte, 1)
await readNextByte or deadline
if not readNextByte.finished:
trace "size prefix byte not received in time"
return -1
case parser.feedByte(nextByte)
of Done:
let res = parser.getResult
if res > uint64(MAX_CHUNK_SIZE):
trace "size prefix outside of range", res
return -1
else:
trace "got size prefix", res
return int(res)
of Overflow:
trace "size prefix overflow"
return -1
of Incomplete:
continue
proc readMsgBytes(conn: Connection,
withResponseCode: bool,
deadline: Future[void]): Future[Bytes] {.async.} =
trace "about to read message bytes", withResponseCode
try:
if withResponseCode:
var responseCode: byte
trace "about to read response code"
var readResponseCode = conn.readExactly(addr responseCode, 1)
try:
await readResponseCode or deadline
except LPStreamEOFError:
trace "end of stream received"
return
if not readResponseCode.finished:
trace "response code not received in time"
return
if responseCode > ResponseCode.high.byte:
trace "invalid response code", responseCode
return
logScope: responseCode = ResponseCode(responseCode)
trace "got response code"
case ResponseCode(responseCode)
of InvalidRequest, ServerError:
let responseErrMsg = await conn.readChunk(string, false, deadline)
debug "P2P request resulted in error", responseErrMsg
return
of Success:
# The response is OK, the execution continues below
discard
var sizePrefix = await conn.readSizePrefix(deadline)
trace "got msg size prefix", sizePrefix
if sizePrefix == -1:
debug "Failed to read an incoming message size prefix", peer = conn.peerId
return
if sizePrefix == 0:
debug "Received SSZ with zero size", peer = conn.peerId
return
trace "about to read msg bytes", len = sizePrefix
var msgBytes = newSeq[byte](sizePrefix)
var readBody = conn.readExactly(addr msgBytes[0], sizePrefix)
await readBody or deadline
if not readBody.finished:
trace "msg bytes not received in time"
return
trace "got message bytes", len = sizePrefix
return msgBytes
except TransportIncompleteError:
return @[]
proc readChunk(conn: Connection,
MsgType: type,
withResponseCode: bool,
deadline: Future[void]): Future[Option[MsgType]] {.gcsafe, async.} =
var msgBytes = await conn.readMsgBytes(withResponseCode, deadline)
try:
if msgBytes.len > 0:
return some SSZ.decode(msgBytes, MsgType)
except SerializationError as err:
debug "Failed to decode a network message",
msgBytes, errMsg = err.formatMsg("<msg>")
return
proc readResponse(
conn: Connection,
MsgType: type,
deadline: Future[void]): Future[Option[MsgType]] {.gcsafe, async.} =
when MsgType is seq:
type E = ElemType(MsgType)
var results: MsgType
while true:
let nextRes = await conn.readChunk(E, true, deadline)
if nextRes.isNone: break
results.add nextRes.get
if results.len > 0:
return some(results)
else:
return await conn.readChunk(MsgType, true, deadline)

@ -1 +1 @@
Subproject commit bb2b58881004481a7d790f4d349d35b29cdbcac9 Subproject commit 229beec0e8ccf81a6dae1f6519ae0bef6d4bd1b5