Integrate the async Snappy implementation
This commit is contained in:
parent
a739d7e8d6
commit
9538b60704
|
@ -5,7 +5,7 @@ import
|
|||
|
||||
# Status libs
|
||||
stew/[varints, base58, bitseqs], stew/shims/[macros, tables], stint,
|
||||
faststreams/outputs, snappy, snappy/framing,
|
||||
faststreams/[inputs, outputs, buffers], snappy, snappy/framing,
|
||||
json_serialization, json_serialization/std/[net, options],
|
||||
chronos, chronicles, metrics,
|
||||
# TODO: create simpler to use libp2p modules that use re-exports
|
||||
|
@ -168,6 +168,61 @@ declarePublicGauge libp2p_successful_dials,
|
|||
declarePublicGauge libp2p_peers,
|
||||
"Number of active libp2p peers"
|
||||
|
||||
type
|
||||
LibP2PInputStream = ref object of InputStream
|
||||
conn: Connection
|
||||
|
||||
const
|
||||
closingErrMsg = "Failed to close LibP2P stream"
|
||||
readingErrMsg = "Failed to read from LibP2P stream"
|
||||
|
||||
proc safeClose(conn: Connection) {.async.} =
|
||||
if not conn.closed:
|
||||
await close(conn)
|
||||
|
||||
proc libp2pCloseWait(s: LibP2PInputStream) {.async.} =
|
||||
fsTranslateErrors closingErrMsg:
|
||||
await safeClose(s.conn)
|
||||
|
||||
proc libp2pReadOnce(s: LibP2PInputStream,
|
||||
dst: pointer, dstLen: Natural): Future[Natural] {.async.} =
|
||||
fsTranslateErrors readingErrMsg:
|
||||
try:
|
||||
return implementSingleRead(s.buffers, dst, dstLen, ReadFlags {},
|
||||
readStartAddr, readLen):
|
||||
await s.conn.readOnce(readStartAddr, readLen)
|
||||
except LPStreamEOFError:
|
||||
s.buffers.eofReached = true
|
||||
|
||||
# TODO: Use the Raising type here
|
||||
let libp2pInputVTable = InputStreamVTable(
|
||||
readSync: proc (s: InputStream, dst: pointer, dstLen: Natural): Natural
|
||||
{.nimcall, gcsafe, raises: [IOError, Defect].} =
|
||||
doAssert(false, "synchronous reading is not allowed")
|
||||
,
|
||||
readAsync: proc (s: InputStream, dst: pointer, dstLen: Natural): Future[Natural]
|
||||
{.nimcall, gcsafe, raises: [IOError, Defect].} =
|
||||
fsTranslateErrors "Unexpected exception from merely forwarding a future":
|
||||
return libp2pReadOnce(Libp2pInputStream s, dst, dstLen)
|
||||
,
|
||||
closeSync: proc (s: InputStream)
|
||||
{.nimcall, gcsafe, raises: [IOError, Defect].} =
|
||||
fsTranslateErrors closingErrMsg:
|
||||
s.closeFut = Libp2pInputStream(s).conn.close()
|
||||
,
|
||||
closeAsync: proc (s: InputStream): Future[void]
|
||||
{.nimcall, gcsafe, raises: [IOError, Defect].} =
|
||||
fsTranslateErrors "Unexpected exception from merely forwarding a future":
|
||||
return libp2pCloseWait(Libp2pInputStream s)
|
||||
)
|
||||
|
||||
func libp2pInput*(conn: Connection,
|
||||
pageSize = defaultPageSize): AsyncInputStream =
|
||||
AsyncInputStream LibP2PInputStream(
|
||||
vtable: vtableAddr libp2pInputVTable,
|
||||
buffers: initPageBuffers(pageSize),
|
||||
conn: conn)
|
||||
|
||||
template libp2pProtocol*(name: string, version: int) {.pragma.}
|
||||
|
||||
template `$`*(peer: Peer): string = id(peer.info)
|
||||
|
@ -222,10 +277,6 @@ proc disconnect*(peer: Peer, reason: DisconnectionReason,
|
|||
peer.network.peerPool.release(peer)
|
||||
peer.info.close()
|
||||
|
||||
proc safeClose(conn: Connection) {.async.} =
|
||||
if not conn.closed:
|
||||
await close(conn)
|
||||
|
||||
include eth/p2p/p2p_backends_helpers
|
||||
include eth/p2p/p2p_tracing
|
||||
|
||||
|
@ -255,23 +306,11 @@ proc disconnectAndRaise(peer: Peer,
|
|||
await peer.disconnect(r)
|
||||
raisePeerDisconnected(msg, r)
|
||||
|
||||
proc readChunk(conn: Connection,
|
||||
MsgType: type,
|
||||
withResponseCode: bool,
|
||||
deadline: Future[void]): Future[Option[MsgType]] {.gcsafe.}
|
||||
|
||||
proc readSizePrefix(conn: Connection,
|
||||
deadline: Future[void]): Future[int] {.async.} =
|
||||
proc readSizePrefix(s: AsyncInputStream): Future[int] {.async.} =
|
||||
trace "about to read msg size prefix"
|
||||
var parser: VarintParser[uint64, ProtoBuf]
|
||||
while true:
|
||||
var nextByte: byte
|
||||
var readNextByte = conn.readExactly(addr nextByte, 1)
|
||||
await readNextByte or deadline
|
||||
if not readNextByte.finished:
|
||||
trace "size prefix byte not received in time"
|
||||
return -1
|
||||
case parser.feedByte(nextByte)
|
||||
while s.readable:
|
||||
case parser.feedByte(s.read)
|
||||
of Done:
|
||||
let res = parser.getResult
|
||||
if res > uint64(MAX_CHUNK_SIZE):
|
||||
|
@ -286,117 +325,61 @@ proc readSizePrefix(conn: Connection,
|
|||
of Incomplete:
|
||||
continue
|
||||
|
||||
proc readMsgBytes(conn: Connection,
|
||||
withResponseCode: bool,
|
||||
deadline: Future[void]): Future[Bytes] {.async.} =
|
||||
trace "about to read message bytes", withResponseCode
|
||||
proc readSszValue(s: AsyncInputStream, MsgType: type): Future[MsgType] {.async.} =
|
||||
let size = await s.readSizePrefix
|
||||
if size > 0 and s.readable(size):
|
||||
s.nonBlockingReads(ss):
|
||||
return ss.readValue(SSZ, MsgType)
|
||||
else:
|
||||
raise newException(CatchableError,
|
||||
"Failed to read an incoming message size prefix")
|
||||
|
||||
try:
|
||||
if withResponseCode:
|
||||
var responseCode: byte
|
||||
trace "about to read response code"
|
||||
var readResponseCode = conn.readExactly(addr responseCode, 1)
|
||||
try:
|
||||
await readResponseCode or deadline
|
||||
except LPStreamEOFError:
|
||||
trace "end of stream received"
|
||||
return
|
||||
proc readResponseCode(s: AsyncInputStream): Future[Result[bool, string]] {.async.} =
|
||||
if s.readable:
|
||||
let responseCode = s.read
|
||||
static: assert responseCode.type.low == 0
|
||||
if responseCode > ResponseCode.high.byte:
|
||||
return err("Invalid response code")
|
||||
|
||||
if not readResponseCode.finished:
|
||||
trace "response code not received in time"
|
||||
return
|
||||
case ResponseCode(responseCode):
|
||||
of InvalidRequest, ServerError:
|
||||
return err(await s.readSszValue(string))
|
||||
of Success:
|
||||
return ok true
|
||||
else:
|
||||
return ok false
|
||||
|
||||
if responseCode > ResponseCode.high.byte:
|
||||
trace "invalid response code", responseCode
|
||||
return
|
||||
|
||||
logScope: responseCode = ResponseCode(responseCode)
|
||||
trace "got response code"
|
||||
|
||||
case ResponseCode(responseCode)
|
||||
of InvalidRequest, ServerError:
|
||||
let responseErrMsg = await conn.readChunk(string, false, deadline)
|
||||
debug "P2P request resulted in error", responseErrMsg
|
||||
return
|
||||
|
||||
of Success:
|
||||
# The response is OK, the execution continues below
|
||||
discard
|
||||
|
||||
var sizePrefix = await conn.readSizePrefix(deadline)
|
||||
trace "got msg size prefix", sizePrefix
|
||||
|
||||
if sizePrefix == -1:
|
||||
debug "Failed to read an incoming message size prefix", peer = conn.peerId
|
||||
return
|
||||
|
||||
if sizePrefix == 0:
|
||||
debug "Received SSZ with zero size", peer = conn.peerId
|
||||
return
|
||||
|
||||
trace "about to read msg bytes", len = sizePrefix
|
||||
var msgBytes = newSeq[byte](sizePrefix)
|
||||
var readBody = conn.readExactly(addr msgBytes[0], sizePrefix)
|
||||
await readBody or deadline
|
||||
if not readBody.finished:
|
||||
trace "msg bytes not received in time"
|
||||
return
|
||||
|
||||
trace "got message bytes", len = sizePrefix
|
||||
return msgBytes
|
||||
|
||||
except TransportIncompleteError:
|
||||
return @[]
|
||||
|
||||
proc readChunk(conn: Connection,
|
||||
MsgType: type,
|
||||
withResponseCode: bool,
|
||||
deadline: Future[void]): Future[Option[MsgType]] {.gcsafe, async.} =
|
||||
var msgBytes = await conn.readMsgBytes(withResponseCode, deadline)
|
||||
try:
|
||||
if msgBytes.len > 0:
|
||||
return some SSZ.decode(msgBytes, MsgType)
|
||||
except SerializationError as err:
|
||||
debug "Failed to decode a network message",
|
||||
msgBytes, errMsg = err.formatMsg("<msg>")
|
||||
return
|
||||
|
||||
proc readResponse(
|
||||
conn: Connection,
|
||||
MsgType: type,
|
||||
deadline: Future[void]): Future[Option[MsgType]] {.gcsafe, async.} =
|
||||
proc readChunk(s: AsyncInputStream,
|
||||
MsgType: typedesc): Future[Option[MsgType]] {.async.} =
|
||||
let rc = await s.readResponseCode()
|
||||
if rc.isOk:
|
||||
if rc[]:
|
||||
return some(await readSszValue(s, MsgType))
|
||||
else:
|
||||
trace "Failed to read response code",
|
||||
reason = rc.error
|
||||
|
||||
proc readResponse(s: AsyncInputStream,
|
||||
MsgType: type): Future[Option[MsgType]] {.gcsafe, async.} =
|
||||
when MsgType is seq:
|
||||
type E = ElemType(MsgType)
|
||||
var results: MsgType
|
||||
while true:
|
||||
let nextRes = await conn.readChunk(E, true, deadline)
|
||||
let nextRes = await s.readChunk(E)
|
||||
if nextRes.isNone: break
|
||||
results.add nextRes.get
|
||||
if results.len > 0:
|
||||
return some(results)
|
||||
else:
|
||||
return await conn.readChunk(MsgType, true, deadline)
|
||||
return await s.readChunk(MsgType)
|
||||
|
||||
proc encodeErrorMsg(responseCode: ResponseCode, errMsg: string): Bytes =
|
||||
var s = memoryOutput()
|
||||
s.append byte(responseCode)
|
||||
s.appendVarint errMsg.len
|
||||
s.appendValue SSZ, errMsg
|
||||
s.write byte(responseCode)
|
||||
s.writeVarint errMsg.len
|
||||
s.writeValue SSZ, errMsg
|
||||
s.getOutput
|
||||
|
||||
proc sendErrorResponse(peer: Peer,
|
||||
conn: Connection,
|
||||
err: ref SerializationError,
|
||||
msgName: string,
|
||||
msgBytes: Bytes) {.async.} =
|
||||
debug "Received an invalid request",
|
||||
peer, msgName, msgBytes, errMsg = err.formatMsg("<msg>")
|
||||
|
||||
let responseBytes = encodeErrorMsg(InvalidRequest, err.formatMsg("msg"))
|
||||
await conn.write(responseBytes)
|
||||
await conn.close()
|
||||
|
||||
proc sendErrorResponse(peer: Peer,
|
||||
conn: Connection,
|
||||
responseCode: ResponseCode,
|
||||
|
@ -422,11 +405,11 @@ proc sendNotificationMsg(peer: Peer, protocolId: string, requestBytes: Bytes) {.
|
|||
let stream = streamFut.read
|
||||
try:
|
||||
var s = memoryOutput()
|
||||
s.appendVarint requestBytes.len.uint64
|
||||
s.writeVarint requestBytes.len.uint64
|
||||
if peer.supportsSnappy:
|
||||
framing_format_compress(s, requestBytes)
|
||||
else:
|
||||
s.append requestBytes
|
||||
s.write requestBytes
|
||||
let bytes = s.getOutput
|
||||
await stream.write(bytes)
|
||||
finally:
|
||||
|
@ -436,24 +419,24 @@ proc sendNotificationMsg(peer: Peer, protocolId: string, requestBytes: Bytes) {.
|
|||
# I hope to reduce this when I increse the reliance on output streams.
|
||||
proc sendResponseChunkBytes(responder: UntypedResponder, payload: Bytes) {.async.} =
|
||||
var s = memoryOutput()
|
||||
s.append byte(Success)
|
||||
s.appendVarint payload.len.uint64
|
||||
s.append payload
|
||||
s.write byte(Success)
|
||||
s.writeVarint payload.len.uint64
|
||||
s.write payload
|
||||
let bytes = s.getOutput
|
||||
await responder.stream.write(bytes)
|
||||
|
||||
proc sendResponseChunkObj(responder: UntypedResponder, val: auto) {.async.} =
|
||||
var s = memoryOutput()
|
||||
s.append byte(Success)
|
||||
s.appendValue SSZ, sizePrefixed(val)
|
||||
s.write byte(Success)
|
||||
s.writeValue SSZ, sizePrefixed(val)
|
||||
let bytes = s.getOutput
|
||||
await responder.stream.write(bytes)
|
||||
|
||||
proc sendResponseChunks[T](responder: UntypedResponder, chunks: seq[T]) {.async.} =
|
||||
var s = memoryOutput()
|
||||
for chunk in chunks:
|
||||
s.append byte(Success)
|
||||
s.appendValue SSZ, sizePrefixed(chunk)
|
||||
s.write byte(Success)
|
||||
s.writeValue SSZ, sizePrefixed(chunk)
|
||||
|
||||
let bytes = s.getOutput
|
||||
await responder.stream.write(bytes)
|
||||
|
@ -473,19 +456,21 @@ proc makeEth2Request(peer: Peer, protocolId: string, requestBytes: Bytes,
|
|||
return none(ResponseMsg)
|
||||
|
||||
let stream = streamFut.read
|
||||
|
||||
try:
|
||||
# Send the request
|
||||
var s = memoryOutput()
|
||||
s.appendVarint requestBytes.len.uint64
|
||||
s.writeVarint requestBytes.len.uint64
|
||||
if peer.supportsSnappy:
|
||||
framing_format_compress(s, requestBytes)
|
||||
else:
|
||||
s.append requestBytes
|
||||
s.write requestBytes
|
||||
let bytes = s.getOutput
|
||||
await stream.write(bytes)
|
||||
|
||||
# Read the response
|
||||
return await stream.readResponse(ResponseMsg, deadline)
|
||||
let response = libp2pInput(stream)
|
||||
return await response.readResponse(ResponseMsg)
|
||||
finally:
|
||||
await safeClose(stream)
|
||||
|
||||
|
@ -563,8 +548,10 @@ proc implementSendProcBody(sendProc: SendProc) =
|
|||
|
||||
sendProc.useStandardBody(nil, nil, sendCallGenerator)
|
||||
|
||||
proc handleIncomingStream(network: Eth2Node, conn: Connection, useSnappy: bool,
|
||||
MsgType, Format: distinct type) {.async, gcsafe.} =
|
||||
proc handleIncomingStream(network: Eth2Node,
|
||||
conn: Connection,
|
||||
useSnappy: bool,
|
||||
MsgType: type) {.async, gcsafe.} =
|
||||
mixin callUserHandler, RecType
|
||||
const msgName = typetraits.name(MsgType)
|
||||
|
||||
|
@ -576,39 +563,53 @@ proc handleIncomingStream(network: Eth2Node, conn: Connection, useSnappy: bool,
|
|||
# defer: setLogLevel(LogLevel.DEBUG)
|
||||
# trace "incoming " & `msgNameLit` & " conn"
|
||||
|
||||
let peer = peerFromStream(network, conn)
|
||||
|
||||
try:
|
||||
let peer = peerFromStream(network, conn)
|
||||
|
||||
let s = libp2pInput(conn)
|
||||
|
||||
if s.timeoutToNextByte(TTFB_TIMEOUT):
|
||||
await sendErrorResponse(peer, conn, InvalidRequest,
|
||||
"Request first byte not sent in time")
|
||||
return
|
||||
|
||||
let deadline = sleepAsync RESP_TIMEOUT
|
||||
var msgBytes = await readMsgBytes(conn, false, deadline)
|
||||
|
||||
if msgBytes.len == 0:
|
||||
await sendErrorResponse(peer, conn, ServerError, readTimeoutErrorMsg)
|
||||
return
|
||||
let processingFut = if useSnappy:
|
||||
s.executePipeline(uncompressFramedStream,
|
||||
readSszValue MsgType)
|
||||
else:
|
||||
s.readSszValue MsgType
|
||||
|
||||
if useSnappy:
|
||||
msgBytes = framingFormatUncompress(msgBytes)
|
||||
|
||||
type MsgRec = RecType(MsgType)
|
||||
var msg: MsgRec
|
||||
try:
|
||||
msg = decode(Format, msgBytes, MsgRec)
|
||||
await processingFut or deadline
|
||||
except SerializationError as err:
|
||||
await sendErrorResponse(peer, conn, err, msgName, msgBytes)
|
||||
await sendErrorResponse(peer, conn, InvalidRequest, err.formatMsg("msg"))
|
||||
return
|
||||
except SnappyError as err:
|
||||
await sendErrorResponse(peer, conn, InvalidRequest, err.msg)
|
||||
return
|
||||
|
||||
if not processingFut.finished:
|
||||
processingFut.cancel()
|
||||
await sendErrorResponse(peer, conn, InvalidRequest,
|
||||
"Request full data not sent in time")
|
||||
return
|
||||
|
||||
if processingFut.error != nil:
|
||||
await sendErrorResponse(peer, conn, ServerError,
|
||||
processingFut.error.msg)
|
||||
return
|
||||
except Exception as err:
|
||||
# TODO. This is temporary code that should be removed after interop.
|
||||
# It can be enabled only in certain diagnostic builds where it should
|
||||
# re-raise the exception.
|
||||
debug "Crash during serialization", inputBytes = toHex(msgBytes), msgName
|
||||
await sendErrorResponse(peer, conn, ServerError, err.msg)
|
||||
raise err
|
||||
|
||||
try:
|
||||
logReceivedMsg(peer, MsgType(msg))
|
||||
await callUserHandler(peer, conn, msg)
|
||||
logReceivedMsg(peer, processingFut.read)
|
||||
await callUserHandler(peer, conn, processingFut.read)
|
||||
except CatchableError as err:
|
||||
await sendErrorResponse(peer, conn, ServerError, err.msg)
|
||||
|
||||
except CatchableError as err:
|
||||
debug "Error processing an incoming request", err = err.msg
|
||||
|
||||
finally:
|
||||
await safeClose(conn)
|
||||
|
||||
|
@ -858,7 +859,7 @@ proc p2pProtocolBackendImpl*(p: P2PProtocol): Backend =
|
|||
proc sszThunk(`streamVar`: `Connection`,
|
||||
proto: string): Future[void] {.gcsafe.} =
|
||||
return handleIncomingStream(`networkVar`, `streamVar`, false,
|
||||
`MsgStrongRecName`, `Format`)
|
||||
`MsgRecName`)
|
||||
|
||||
mount `networkVar`.switch,
|
||||
LPProtocol(codec: `codecNameLit` & "ssz",
|
||||
|
@ -867,7 +868,7 @@ proc p2pProtocolBackendImpl*(p: P2PProtocol): Backend =
|
|||
proc snappyThunk(`streamVar`: `Connection`,
|
||||
proto: string): Future[void] {.gcsafe.} =
|
||||
return handleIncomingStream(`networkVar`, `streamVar`, true,
|
||||
`MsgStrongRecName`, `Format`)
|
||||
`MsgRecName`)
|
||||
|
||||
mount `networkVar`.switch,
|
||||
LPProtocol(codec: `codecNameLit` & "ssz_snappy",
|
||||
|
|
|
@ -1 +1 @@
|
|||
Subproject commit e9f50bc8478b261c734f704d3aac3853bcbe8a30
|
||||
Subproject commit 8b863f7798f7e1a6a9266b88e7bca5aa0cb5ed8a
|
|
@ -1 +1 @@
|
|||
Subproject commit cef0a2b4d247bdd9038364f0e41230edf42706d6
|
||||
Subproject commit 1c0e0adf459932e45b2a7483f704086cee7c906a
|
Loading…
Reference in New Issue