remove faststreams backend for eth2_network (#3617)

unmaintained, incompatible with altair+
This commit is contained in:
Jacek Sieka 2022-05-08 09:08:13 +02:00 committed by GitHub
parent fec933432d
commit c6915ed3e0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 196 additions and 370 deletions

View File

@ -301,13 +301,6 @@ declareHistogram nbc_resolve_time,
"Time(s) used while resolving peer information",
buckets = delayBuckets
const
snappy_implementation {.strdefine.} = "libp2p"
const useNativeSnappy = when snappy_implementation == "native": true
elif snappy_implementation == "libp2p": false
else: {.fatal: "Please set snappy_implementation to either 'libp2p' or 'native'".}
const
libp2p_pki_schemes {.strdefine.} = ""
@ -638,10 +631,198 @@ template sendUserHandlerResultAsChunkImpl*(stream: Connection,
handlerResult: auto): untyped =
writeChunk(stream, some Success, SSZ.encode(handlerResult))
when useNativeSnappy:
include faststreams_backend
else:
include libp2p_streams_backend
proc uncompressFramedStream*(conn: Connection,
expectedSize: int): Future[Result[seq[byte], cstring]]
{.async.} =
var header: array[framingHeader.len, byte]
try:
await conn.readExactly(addr header[0], header.len)
except LPStreamEOFError, LPStreamIncompleteError:
return err "Unexpected EOF before snappy header"
if header != framingHeader:
return err "Incorrect snappy header"
static:
doAssert maxCompressedFrameDataLen >= maxUncompressedFrameDataLen.uint64
var
frameData = newSeq[byte](maxCompressedFrameDataLen + 4)
output = newSeqUninitialized[byte](expectedSize)
written = 0
while written < expectedSize:
var frameHeader: array[4, byte]
try:
await conn.readExactly(addr frameHeader[0], frameHeader.len)
except LPStreamEOFError, LPStreamIncompleteError:
return err "Snappy frame header missing"
let (id, dataLen) = decodeFrameHeader(frameHeader)
if dataLen > frameData.len:
# In theory, compressed frames could be bigger and still result in a
# valid, small snappy frame, but this would mean they are not getting
# compressed correctly
return err "Snappy frame too big"
if dataLen > 0:
try:
await conn.readExactly(addr frameData[0], dataLen)
except LPStreamEOFError, LPStreamIncompleteError:
return err "Incomplete snappy frame"
if id == chunkCompressed:
if dataLen < 6: # At least CRC + 2 bytes of frame data
return err "Compressed snappy frame too small"
let
crc = uint32.fromBytesLE frameData.toOpenArray(0, 3)
uncompressed =
snappy.uncompress(
frameData.toOpenArray(4, dataLen - 1),
output.toOpenArray(written, output.high)).valueOr:
return err "Failed to decompress content"
if maskedCrc(
output.toOpenArray(written, written + uncompressed-1)) != crc:
return err "Snappy content CRC checksum failed"
written += uncompressed
elif id == chunkUncompressed:
if dataLen < 5: # At least one byte of data
return err "Uncompressed snappy frame too small"
let uncompressed = dataLen - 4
if uncompressed > maxUncompressedFrameDataLen.int:
return err "Snappy frame size too large"
if uncompressed > output.len - written:
return err "Too much data"
let crc = uint32.fromBytesLE frameData.toOpenArray(0, 3)
if maskedCrc(frameData.toOpenArray(4, dataLen - 1)) != crc:
return err "Snappy content CRC checksum failed"
output[written..<written + uncompressed] =
frameData.toOpenArray(4, dataLen-1)
written += uncompressed
elif id < 0x80:
# Reserved unskippable chunks (chunk types 0x02-0x7f)
# if we encounter this type of chunk, stop decoding
# the spec says it is an error
return err "Invalid snappy chunk type"
else:
# Reserved skippable chunks (chunk types 0x80-0xfe)
# including STREAM_HEADER (0xff) should be skipped
continue
return ok output
proc readChunkPayload*(conn: Connection, peer: Peer,
maxChunkSize: uint32,
MsgType: type): Future[NetRes[MsgType]] {.async.} =
let sm = now(chronos.Moment)
let size =
try: await conn.readVarint()
except LPStreamEOFError: #, LPStreamIncompleteError, InvalidVarintError
# TODO compiler error - haha, uncaught exception
# Error: unhandled exception: closureiters.nim(322, 17) `c[i].kind == nkType` [AssertionError]
return neterr UnexpectedEOF
except LPStreamIncompleteError:
return neterr UnexpectedEOF
except InvalidVarintError:
return neterr UnexpectedEOF
if size > maxChunkSize:
return neterr SizePrefixOverflow
if size == 0:
return neterr ZeroSizePrefix
# The `size.int` conversion is safe because `size` is bounded to `MAX_CHUNK_SIZE`
let data = await conn.uncompressFramedStream(size.int)
if data.isOk:
# `10` is the maximum size of variable integer on wire, so error could
# not be significant.
peer.updateNetThroughput(now(chronos.Moment) - sm,
uint64(10 + size))
return ok SSZ.decode(data.get(), MsgType)
else:
debug "Snappy decompression/read failed", msg = $data.error, conn
return neterr InvalidSnappyBytes
proc readResponseChunk(conn: Connection, peer: Peer, maxChunkSize: uint32,
MsgType: typedesc): Future[NetRes[MsgType]] {.async.} =
mixin readChunkPayload
try:
var responseCodeByte: byte
try:
await conn.readExactly(addr responseCodeByte, 1)
except LPStreamEOFError, LPStreamIncompleteError:
return neterr PotentiallyExpectedEOF
static: assert ResponseCode.low.ord == 0
if responseCodeByte > ResponseCode.high.byte:
return neterr InvalidResponseCode
let responseCode = ResponseCode responseCodeByte
case responseCode:
of InvalidRequest, ServerError, ResourceUnavailable:
let
errorMsgChunk = await readChunkPayload(
conn, peer, maxChunkSize, ErrorMsg)
errorMsg = if errorMsgChunk.isOk: errorMsgChunk.value
else: return err(errorMsgChunk.error)
errorMsgStr = toPrettyString(errorMsg.asSeq)
debug "Error response from peer", responseCode, errMsg = errorMsgStr
return err Eth2NetworkingError(kind: ReceivedErrorResponse,
responseCode: responseCode,
errorMsg: errorMsgStr)
of Success:
discard
return await readChunkPayload(conn, peer, maxChunkSize, MsgType)
except LPStreamEOFError, LPStreamIncompleteError:
return neterr UnexpectedEOF
proc readResponse(conn: Connection, peer: Peer, maxChunkSize: uint32,
MsgType: type, timeout: Duration): Future[NetRes[MsgType]] {.async.} =
when MsgType is seq:
type E = ElemType(MsgType)
var results: MsgType
while true:
# Because we interleave networking with response processing, it may
# happen that reading all chunks takes longer than a strict dealine
# timeout would allow, so we allow each chunk a new timeout instead.
# The problem is exacerbated by the large number of round-trips to the
# poll loop that each future along the way causes.
trace "reading chunk", conn
let nextFut = conn.readResponseChunk(peer, maxChunkSize, E)
if not await nextFut.withTimeout(timeout):
return neterr(ReadResponseTimeout)
let nextRes = nextFut.read()
if nextRes.isErr:
if nextRes.error.kind == PotentiallyExpectedEOF:
trace "EOF chunk", conn, err = nextRes.error
return ok results
trace "Error chunk", conn, err = nextRes.error
return err nextRes.error
else:
trace "Got chunk", conn
results.add nextRes.value
else:
let nextFut = conn.readResponseChunk(peer, maxChunkSize, MsgType)
if not await nextFut.withTimeout(timeout):
return neterr(ReadResponseTimeout)
return nextFut.read()
func maxChunkSize*(t: typedesc[bellatrix.SignedBeaconBlock]): uint32 =
MAX_CHUNK_SIZE_BELLATRIX
@ -668,9 +849,8 @@ proc makeEth2Request(peer: Peer, protocolId: string, requestBytes: Bytes,
await stream.close()
# Read the response
return
await readResponse(when useNativeSnappy: libp2pInput(stream) else: stream,
peer, maxChunkSize(ResponseMsg), ResponseMsg, timeout)
return await readResponse(
stream, peer, maxChunkSize(ResponseMsg), ResponseMsg, timeout)
finally:
await stream.closeWithEOF()
@ -795,17 +975,7 @@ proc handleIncomingStream(network: Eth2Node,
template returnResourceUnavailable(msg: string) =
returnResourceUnavailable(ErrorMsg msg.toBytes)
let s = when useNativeSnappy:
let fs = libp2pInput(conn)
if fs.timeoutToNextByte(TTFB_TIMEOUT):
returnInvalidRequest(errorMsgLit "Request first byte not sent in time")
fs
else:
# TODO(zah) The TTFB timeout is not implemented in LibP2P streams back-end
conn
# TODO(zah) The TTFB timeout is not implemented in LibP2P streams back-end
let deadline = sleepAsync RESP_TIMEOUT
const isEmptyMsg = when MsgRec is object:
@ -823,7 +993,7 @@ proc handleIncomingStream(network: Eth2Node,
else:
try:
awaitWithTimeout(
readChunkPayload(s, peer, maxChunkSize(MsgRec), MsgRec), deadline):
readChunkPayload(conn, peer, maxChunkSize(MsgRec), MsgRec), deadline):
returnInvalidRequest(
errorMsgLit "Request full data not sent in time")

View File

@ -1,142 +0,0 @@
# beacon_chain
# Copyright (c) 2018-2022 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.
type
LibP2PInputStream = ref object of InputStream
conn: Connection
const
closingErrMsg = "Failed to close LibP2P stream"
readingErrMsg = "Failed to read from LibP2P stream"
proc libp2pReadOnce(s: LibP2PInputStream,
dst: pointer, dstLen: Natural): Future[Natural] {.async.} =
fsTranslateErrors readingErrMsg:
try:
return implementSingleRead(s.buffers, dst, dstLen, ReadFlags {},
readStartAddr, readLen):
await s.conn.readOnce(readStartAddr, readLen)
except LPStreamEOFError:
s.buffers.eofReached = true
proc libp2pCloseWait(s: LibP2PInputStream) {.async.} =
fsTranslateErrors closingErrMsg:
await safeClose(s.conn)
# TODO: Use the Raising type here
let libp2pInputVTable = InputStreamVTable(
readSync: proc (s: InputStream, dst: pointer, dstLen: Natural): Natural
{.nimcall, gcsafe, raises: [IOError, Defect].} =
doAssert(false, "synchronous reading is not allowed")
,
readAsync: proc (s: InputStream, dst: pointer, dstLen: Natural): Future[Natural]
{.nimcall, gcsafe, raises: [IOError, Defect].} =
fsTranslateErrors "Unexpected exception from merely forwarding a future":
return libp2pReadOnce(Libp2pInputStream s, dst, dstLen)
,
closeSync: proc (s: InputStream)
{.nimcall, gcsafe, raises: [IOError, Defect].} =
fsTranslateErrors closingErrMsg:
s.closeFut = Libp2pInputStream(s).conn.close()
,
closeAsync: proc (s: InputStream): Future[void]
{.nimcall, gcsafe, raises: [IOError, Defect].} =
fsTranslateErrors "Unexpected exception from merely forwarding a future":
return libp2pCloseWait(Libp2pInputStream s)
)
func libp2pInput*(conn: Connection,
pageSize = defaultPageSize): AsyncInputStream =
AsyncInputStream LibP2PInputStream(
vtable: vtableAddr libp2pInputVTable,
buffers: initPageBuffers(pageSize),
conn: conn)
proc readSizePrefix(s: AsyncInputStream,
maxSize: uint32): Future[NetRes[uint32]] {.async.} =
var parser: VarintParser[uint32, ProtoBuf]
while s.readable:
case parser.feedByte(s.read)
of Done:
let res = parser.getResult
if res > maxSize:
return neterr SizePrefixOverflow
else:
return ok res
of Overflow:
return neterr SizePrefixOverflow
of Incomplete:
continue
return neterr UnexpectedEOF
proc readSszValue(s: AsyncInputStream,
size: int,
MsgType: type): Future[NetRes[MsgType]] {.async.} =
if s.readable(size):
s.withReadableRange(size, r):
return r.readValue(SSZ, MsgType)
else:
return neterr UnexpectedEOF
proc readChunkPayload(s: AsyncInputStream,
noSnappy: bool,
MsgType: type): Future[NetRes[MsgType]] {.async.} =
# TODO this needs to sometimes be MAX_CHUNK_SIZE_BELLATRIX, for at least
# bellatrix.SignedBeaconBlock
let prefix = await readSizePrefix(s, MAX_CHUNK_SIZE)
let size = if prefix.isOk: prefix.value.int
else: return err(prefix.error)
if size > 0:
let processingFut = if noSnappy:
readSszValue(s, size, MsgType)
else:
executePipeline(uncompressFramedStream,
readSszValue(size, MsgType))
return await processingFut
else:
return neterr ZeroSizePrefix
proc readResponseChunk(s: AsyncInputStream,
noSnappy: bool,
MsgType: typedesc): Future[NetRes[MsgType]] {.async.} =
let responseCodeByte = s.read
static: assert ResponseCode.low.ord == 0
if responseCodeByte > ResponseCode.high.byte:
return neterr InvalidResponseCode
let responseCode = ResponseCode responseCodeByte
case responseCode:
of InvalidRequest, ServerError, ResourceUnavailable:
let errorMsgChunk = await readChunkPayload(s, noSnappy, string)
let errorMsg = if errorMsgChunk.isOk: errorMsgChunk.value
else: return err(errorMsgChunk.error)
return err Eth2NetworkingError(kind: ReceivedErrorResponse,
responseCode: responseCode,
errorMsg: toPrettyString(errorMsg.asSeq()))
of Success:
discard
return await readChunkPayload(s, noSnappy, MsgType)
proc readResponse(s: AsyncInputStream,
noSnappy: bool,
MsgType: type): Future[NetRes[MsgType]] {.gcsafe, async.} =
when MsgType is seq:
type E = ElemType(MsgType)
var results: MsgType
while s.readable:
results.add(? await s.readResponseChunk(noSnappy, E))
return ok results
else:
if s.readable:
return await s.readResponseChunk(noSnappy, MsgType)
else:
return neterr UnexpectedEOF

View File

@ -1,200 +0,0 @@
# beacon_chain
# Copyright (c) 2018-2022 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.
# TODO: How can this be tested?
proc uncompressFramedStream*(conn: Connection,
expectedSize: int): Future[Result[seq[byte], cstring]]
{.async.} =
var header: array[framingHeader.len, byte]
try:
await conn.readExactly(addr header[0], header.len)
except LPStreamEOFError, LPStreamIncompleteError:
return err "Unexpected EOF before snappy header"
if header != framingHeader:
return err "Incorrect snappy header"
static:
doAssert maxCompressedFrameDataLen >= maxUncompressedFrameDataLen.uint64
var
frameData = newSeq[byte](maxCompressedFrameDataLen + 4)
output = newSeqUninitialized[byte](expectedSize)
written = 0
while written < expectedSize:
var frameHeader: array[4, byte]
try:
await conn.readExactly(addr frameHeader[0], frameHeader.len)
except LPStreamEOFError, LPStreamIncompleteError:
return err "Snappy frame header missing"
let (id, dataLen) = decodeFrameHeader(frameHeader)
if dataLen > frameData.len:
# In theory, compressed frames could be bigger and still result in a
# valid, small snappy frame, but this would mean they are not getting
# compressed correctly
return err "Snappy frame too big"
if dataLen > 0:
try:
await conn.readExactly(addr frameData[0], dataLen)
except LPStreamEOFError, LPStreamIncompleteError:
return err "Incomplete snappy frame"
if id == chunkCompressed:
if dataLen < 6: # At least CRC + 2 bytes of frame data
return err "Compressed snappy frame too small"
let
crc = uint32.fromBytesLE frameData.toOpenArray(0, 3)
uncompressed =
snappy.uncompress(
frameData.toOpenArray(4, dataLen - 1),
output.toOpenArray(written, output.high)).valueOr:
return err "Failed to decompress content"
if maskedCrc(
output.toOpenArray(written, written + uncompressed-1)) != crc:
return err "Snappy content CRC checksum failed"
written += uncompressed
elif id == chunkUncompressed:
if dataLen < 5: # At least one byte of data
return err "Uncompressed snappy frame too small"
let uncompressed = dataLen - 4
if uncompressed > maxUncompressedFrameDataLen.int:
return err "Snappy frame size too large"
if uncompressed > output.len - written:
return err "Too much data"
let crc = uint32.fromBytesLE frameData.toOpenArray(0, 3)
if maskedCrc(frameData.toOpenArray(4, dataLen - 1)) != crc:
return err "Snappy content CRC checksum failed"
output[written..<written + uncompressed] =
frameData.toOpenArray(4, dataLen-1)
written += uncompressed
elif id < 0x80:
# Reserved unskippable chunks (chunk types 0x02-0x7f)
# if we encounter this type of chunk, stop decoding
# the spec says it is an error
return err "Invalid snappy chunk type"
else:
# Reserved skippable chunks (chunk types 0x80-0xfe)
# including STREAM_HEADER (0xff) should be skipped
continue
return ok output
proc readChunkPayload*(conn: Connection, peer: Peer,
maxChunkSize: uint32,
MsgType: type): Future[NetRes[MsgType]] {.async.} =
let sm = now(chronos.Moment)
let size =
try: await conn.readVarint()
except LPStreamEOFError: #, LPStreamIncompleteError, InvalidVarintError
# TODO compiler error - haha, uncaught exception
# Error: unhandled exception: closureiters.nim(322, 17) `c[i].kind == nkType` [AssertionError]
return neterr UnexpectedEOF
except LPStreamIncompleteError:
return neterr UnexpectedEOF
except InvalidVarintError:
return neterr UnexpectedEOF
if size > maxChunkSize:
return neterr SizePrefixOverflow
if size == 0:
return neterr ZeroSizePrefix
# The `size.int` conversion is safe because `size` is bounded to `MAX_CHUNK_SIZE`
let data = await conn.uncompressFramedStream(size.int)
if data.isOk:
# `10` is the maximum size of variable integer on wire, so error could
# not be significant.
peer.updateNetThroughput(now(chronos.Moment) - sm,
uint64(10 + size))
return ok SSZ.decode(data.get(), MsgType)
else:
debug "Snappy decompression/read failed", msg = $data.error, conn
return neterr InvalidSnappyBytes
proc readResponseChunk(conn: Connection, peer: Peer, maxChunkSize: uint32,
MsgType: typedesc): Future[NetRes[MsgType]] {.async.} =
mixin readChunkPayload
try:
var responseCodeByte: byte
try:
await conn.readExactly(addr responseCodeByte, 1)
except LPStreamEOFError, LPStreamIncompleteError:
return neterr PotentiallyExpectedEOF
static: assert ResponseCode.low.ord == 0
if responseCodeByte > ResponseCode.high.byte:
return neterr InvalidResponseCode
let responseCode = ResponseCode responseCodeByte
case responseCode:
of InvalidRequest, ServerError, ResourceUnavailable:
let
errorMsgChunk = await readChunkPayload(
conn, peer, maxChunkSize, ErrorMsg)
errorMsg = if errorMsgChunk.isOk: errorMsgChunk.value
else: return err(errorMsgChunk.error)
errorMsgStr = toPrettyString(errorMsg.asSeq)
debug "Error response from peer", responseCode, errMsg = errorMsgStr
return err Eth2NetworkingError(kind: ReceivedErrorResponse,
responseCode: responseCode,
errorMsg: errorMsgStr)
of Success:
discard
return await readChunkPayload(conn, peer, maxChunkSize, MsgType)
except LPStreamEOFError, LPStreamIncompleteError:
return neterr UnexpectedEOF
proc readResponse(conn: Connection, peer: Peer, maxChunkSize: uint32,
MsgType: type, timeout: Duration): Future[NetRes[MsgType]] {.async.} =
when MsgType is seq:
type E = ElemType(MsgType)
var results: MsgType
while true:
# Because we interleave networking with response processing, it may
# happen that reading all chunks takes longer than a strict dealine
# timeout would allow, so we allow each chunk a new timeout instead.
# The problem is exacerbated by the large number of round-trips to the
# poll loop that each future along the way causes.
trace "reading chunk", conn
let nextFut = conn.readResponseChunk(peer, maxChunkSize, E)
if not await nextFut.withTimeout(timeout):
return neterr(ReadResponseTimeout)
let nextRes = nextFut.read()
if nextRes.isErr:
if nextRes.error.kind == PotentiallyExpectedEOF:
trace "EOF chunk", conn, err = nextRes.error
return ok results
trace "Error chunk", conn, err = nextRes.error
return err nextRes.error
else:
trace "Got chunk", conn
results.add nextRes.value
else:
let nextFut = conn.readResponseChunk(peer, maxChunkSize, MsgType)
if not await nextFut.withTimeout(timeout):
return neterr(ReadResponseTimeout)
return nextFut.read()

View File

@ -118,8 +118,6 @@ switch("passL", "-fno-omit-frame-pointer")
# for heap-usage-by-instance-type metrics and object base-type strings
--define:nimTypeNames
# switch("define", "snappy_implementation=libp2p")
# TODO https://github.com/status-im/nimbus-eth2/issues/3130
# We are still seeing problems with the websock package, se we stick to using news:
switch("define", "json_rpc_websocket_package=news")