mirror of
https://github.com/status-im/nimbus-eth2.git
synced 2025-02-26 21:20:34 +00:00
Enable Snappy by default (using LibP2P steams for now)
This refactors the newly added Snappy streaming back-ends trying to make them more similar and to reduce the code duplication to a minimum.
This commit is contained in:
parent
f055fad08a
commit
75c1c6a95c
@ -4,8 +4,8 @@ import
|
||||
options as stdOptions, net as stdNet,
|
||||
|
||||
# Status libs
|
||||
stew/[varints, base58, bitseqs], stew/shims/[macros, tables], stint,
|
||||
faststreams/[inputs, outputs, buffers], snappy, snappy/framing,
|
||||
stew/[varints, base58, bitseqs, results], stew/shims/[macros, tables],
|
||||
stint, faststreams/[inputs, outputs, buffers], snappy, snappy/framing,
|
||||
json_serialization, json_serialization/std/[net, options],
|
||||
chronos, chronicles, metrics,
|
||||
# TODO: create simpler to use libp2p modules that use re-exports
|
||||
@ -27,7 +27,7 @@ import
|
||||
|
||||
export
|
||||
version, multiaddress, peer_pool, peerinfo, p2pProtocol,
|
||||
libp2p_json_serialization, ssz, peer
|
||||
libp2p_json_serialization, ssz, peer, results
|
||||
|
||||
logScope:
|
||||
topics = "networking"
|
||||
@ -74,7 +74,7 @@ type
|
||||
protocolStates*: seq[RootRef]
|
||||
maxInactivityAllowed*: Duration
|
||||
score*: int
|
||||
supportsSnappy: bool
|
||||
lacksSnappy: bool
|
||||
|
||||
ConnectionState* = enum
|
||||
None,
|
||||
@ -86,6 +86,7 @@ type
|
||||
UntypedResponder = object
|
||||
peer*: Peer
|
||||
stream*: Connection
|
||||
noSnappy*: bool
|
||||
|
||||
Responder*[MsgType] = distinct UntypedResponder
|
||||
|
||||
@ -133,6 +134,30 @@ type
|
||||
|
||||
TransmissionError* = object of CatchableError
|
||||
|
||||
Eth2NetworkingErrorKind* = enum
|
||||
BrokenConnection
|
||||
ReceivedErrorResponse
|
||||
UnexpectedEOF
|
||||
PotentiallyExpectedEOF
|
||||
InvalidResponseCode
|
||||
InvalidSnappyBytes
|
||||
InvalidSszBytes
|
||||
StreamOpenTimeout
|
||||
ReadResponseTimeout
|
||||
ZeroSizePrefix
|
||||
SizePrefixOverflow
|
||||
|
||||
Eth2NetworkingError = object
|
||||
case kind*: Eth2NetworkingErrorKind
|
||||
of ReceivedErrorResponse:
|
||||
responseCode: ResponseCode
|
||||
errorMsg: string
|
||||
else:
|
||||
discard
|
||||
|
||||
NetRes*[T] = Result[T, Eth2NetworkingError]
|
||||
## This is type returned from all network requests
|
||||
|
||||
const
|
||||
clientId* = "Nimbus beacon node v" & fullVersionStr
|
||||
networkKeyFilename = "privkey.protobuf"
|
||||
@ -155,6 +180,9 @@ const
|
||||
PeerScoreLimit* = 0
|
||||
## Score after which peer will be kicked
|
||||
|
||||
template neterr(kindParam: Eth2NetworkingErrorKind): auto =
|
||||
err(type(result), Eth2NetworkingError(kind: kindParam))
|
||||
|
||||
# Metrics for tracking attestation and beacon block loss
|
||||
declareCounter gossip_messages_sent,
|
||||
"Number of gossip messages sent by this peer"
|
||||
@ -187,8 +215,23 @@ chronicles.formatIt(Peer): $it
|
||||
template remote*(peer: Peer): untyped =
|
||||
peer.info.peerId
|
||||
|
||||
template openStream(node: Eth2Node, peer: Peer, protocolId: string): untyped =
|
||||
dial(node.switch, peer.info, protocolId)
|
||||
proc openStream(node: Eth2Node,
|
||||
peer: Peer,
|
||||
protocolId: string): Future[Connection] {.async.} =
|
||||
let protocolId = protocolId & (if peer.lacksSnappy: "ssz" else: "ssz_snappy")
|
||||
try:
|
||||
result = await dial(node.switch, peer.info, protocolId)
|
||||
except CancelledError:
|
||||
raise
|
||||
except CatchableError:
|
||||
# TODO: LibP2P should raise a more specific exception here
|
||||
if peer.lacksSnappy == false:
|
||||
peer.lacksSnappy = true
|
||||
trace "Snappy connection failed. Trying without Snappy",
|
||||
peer, protocolId
|
||||
return await openStream(node, peer, protocolId)
|
||||
else:
|
||||
raise
|
||||
|
||||
func peerId(conn: Connection): PeerID =
|
||||
# TODO: Can this be `nil`?
|
||||
@ -262,27 +305,36 @@ proc disconnectAndRaise(peer: Peer,
|
||||
await peer.disconnect(r)
|
||||
raisePeerDisconnected(msg, r)
|
||||
|
||||
proc encodeErrorMsg(responseCode: ResponseCode, errMsg: string): Bytes =
|
||||
var s = memoryOutput()
|
||||
s.write byte(responseCode)
|
||||
s.writeVarint errMsg.len
|
||||
s.writeValue SSZ, errMsg
|
||||
s.getOutput
|
||||
proc writeChunk*(conn: Connection,
|
||||
responseCode: Option[ResponseCode],
|
||||
payload: Bytes,
|
||||
noSnappy: bool) {.async.} =
|
||||
var output = memoryOutput()
|
||||
|
||||
if responseCode.isSome:
|
||||
output.write byte(responseCode.get)
|
||||
|
||||
output.write varintBytes(payload.len.uint64)
|
||||
|
||||
if noSnappy:
|
||||
output.write(payload)
|
||||
else:
|
||||
output.write(framingFormatCompress payload)
|
||||
|
||||
await conn.write(output.getOutput)
|
||||
|
||||
proc sendErrorResponse(peer: Peer,
|
||||
conn: Connection,
|
||||
noSnappy: bool,
|
||||
responseCode: ResponseCode,
|
||||
errMsg: string) {.async.} =
|
||||
debug "Error processing request", peer, responseCode, errMsg
|
||||
|
||||
let responseBytes = encodeErrorMsg(ServerError, errMsg)
|
||||
await conn.write(responseBytes)
|
||||
await conn.close()
|
||||
await conn.writeChunk(some responseCode, SSZ.encode(errMsg), noSnappy)
|
||||
|
||||
proc sendNotificationMsg(peer: Peer, protocolId: string, requestBytes: Bytes) {.async} =
|
||||
var
|
||||
deadline = sleepAsync RESP_TIMEOUT
|
||||
protocolId = protocolId & (if peer.supportsSnappy: "ssz_snappy" else: "ssz")
|
||||
streamFut = peer.network.openStream(peer, protocolId)
|
||||
|
||||
await streamFut or deadline
|
||||
@ -293,42 +345,20 @@ proc sendNotificationMsg(peer: Peer, protocolId: string, requestBytes: Bytes) {.
|
||||
|
||||
let stream = streamFut.read
|
||||
try:
|
||||
var s = memoryOutput()
|
||||
s.writeVarint requestBytes.len.uint64
|
||||
if peer.supportsSnappy:
|
||||
framing_format_compress(s, requestBytes)
|
||||
else:
|
||||
s.write requestBytes
|
||||
let bytes = s.getOutput
|
||||
await stream.write(bytes)
|
||||
await stream.writeChunk(none ResponseCode, requestBytes, peer.lacksSnappy)
|
||||
finally:
|
||||
await safeClose(stream)
|
||||
|
||||
# TODO There is too much duplication in the responder functions, but
|
||||
# I hope to reduce this when I increse the reliance on output streams.
|
||||
proc sendResponseChunkBytes(responder: UntypedResponder, payload: Bytes) {.async.} =
|
||||
var s = memoryOutput()
|
||||
s.write byte(Success)
|
||||
s.writeVarint payload.len.uint64
|
||||
s.write payload
|
||||
let bytes = s.getOutput
|
||||
await responder.stream.write(bytes)
|
||||
await responder.stream.writeChunk(some Success, payload, responder.noSnappy)
|
||||
|
||||
proc sendResponseChunkObj(responder: UntypedResponder, val: auto) {.async.} =
|
||||
var s = memoryOutput()
|
||||
s.write byte(Success)
|
||||
s.writeValue SSZ, sizePrefixed(val)
|
||||
let bytes = s.getOutput
|
||||
await responder.stream.write(bytes)
|
||||
await responder.stream.writeChunk(some Success, SSZ.encode(val),
|
||||
responder.noSnappy)
|
||||
|
||||
proc sendResponseChunks[T](responder: UntypedResponder, chunks: seq[T]) {.async.} =
|
||||
var s = memoryOutput()
|
||||
for chunk in chunks:
|
||||
s.write byte(Success)
|
||||
s.writeValue SSZ, sizePrefixed(chunk)
|
||||
|
||||
let bytes = s.getOutput
|
||||
await responder.stream.write(bytes)
|
||||
await sendResponseChunkObj(responder, chunk)
|
||||
|
||||
when useNativeSnappy:
|
||||
include faststreams_backend
|
||||
@ -348,36 +378,29 @@ template awaitWithTimeout[T](operation: Future[T],
|
||||
|
||||
proc makeEth2Request(peer: Peer, protocolId: string, requestBytes: Bytes,
|
||||
ResponseMsg: type,
|
||||
timeout: Duration): Future[Option[ResponseMsg]] {.gcsafe, async.} =
|
||||
var
|
||||
deadline = sleepAsync timeout
|
||||
protocolId = protocolId & (if peer.supportsSnappy: "ssz_snappy" else: "ssz")
|
||||
timeout: Duration): Future[NetRes[ResponseMsg]]
|
||||
{.gcsafe, async.} =
|
||||
var deadline = sleepAsync timeout
|
||||
|
||||
let stream = awaitWithTimeout(peer.network.openStream(peer, protocolId),
|
||||
deadline): return none(ResponseMsg)
|
||||
deadline): return neterr StreamOpenTimeout
|
||||
try:
|
||||
# Send the request
|
||||
var s = memoryOutput()
|
||||
s.writeVarint requestBytes.len.uint64
|
||||
if peer.supportsSnappy:
|
||||
framing_format_compress(s, requestBytes)
|
||||
else:
|
||||
s.write requestBytes
|
||||
let bytes = s.getOutput
|
||||
await stream.write(bytes)
|
||||
await stream.writeChunk(none ResponseCode, requestBytes, peer.lacksSnappy)
|
||||
|
||||
# Read the response
|
||||
when useNativeSnappy:
|
||||
return awaitWithTimeout(readResponse(libp2pInput(stream), ResponseMsg),
|
||||
deadline, none(ResponseMsg))
|
||||
else:
|
||||
return await readResponse(stream, ResponseMsg, deadline)
|
||||
return awaitWithTimeout(
|
||||
readResponse(when useNativeSnappy: libp2pInput(stream)
|
||||
else: stream,
|
||||
peer.lacksSnappy,
|
||||
ResponseMsg),
|
||||
deadline, neterr(ReadResponseTimeout))
|
||||
finally:
|
||||
await safeClose(stream)
|
||||
|
||||
proc init*[MsgType](T: type Responder[MsgType],
|
||||
peer: Peer, conn: Connection): T =
|
||||
T(UntypedResponder(peer: peer, stream: conn))
|
||||
peer: Peer, conn: Connection, noSnappy: bool): T =
|
||||
T(UntypedResponder(peer: peer, stream: conn, noSnappy: noSnappy))
|
||||
|
||||
template write*[M](r: var Responder[M], val: auto): auto =
|
||||
mixin send
|
||||
@ -451,7 +474,7 @@ proc implementSendProcBody(sendProc: SendProc) =
|
||||
|
||||
proc handleIncomingStream(network: Eth2Node,
|
||||
conn: Connection,
|
||||
useSnappy: bool,
|
||||
noSnappy: bool,
|
||||
MsgType: type) {.async, gcsafe.} =
|
||||
mixin callUserHandler, RecType
|
||||
|
||||
@ -469,73 +492,68 @@ proc handleIncomingStream(network: Eth2Node,
|
||||
try:
|
||||
let peer = peerFromStream(network, conn)
|
||||
|
||||
when useNativeSnappy:
|
||||
let s = libp2pInput(conn)
|
||||
template returnInvalidRequest(msg: string) =
|
||||
await sendErrorResponse(peer, conn, noSnappy, InvalidRequest, msg)
|
||||
return
|
||||
|
||||
if s.timeoutToNextByte(TTFB_TIMEOUT):
|
||||
await sendErrorResponse(peer, conn, InvalidRequest,
|
||||
"Request first byte not sent in time")
|
||||
return
|
||||
let s = when useNativeSnappy:
|
||||
let fs = libp2pInput(conn)
|
||||
|
||||
let deadline = sleepAsync RESP_TIMEOUT
|
||||
|
||||
let processingFut = if useSnappy:
|
||||
s.executePipeline(uncompressFramedStream,
|
||||
readSszValue MsgRec)
|
||||
else:
|
||||
s.readSszValue MsgRec
|
||||
|
||||
try:
|
||||
await processingFut or deadline
|
||||
except SerializationError as err:
|
||||
await sendErrorResponse(peer, conn, InvalidRequest, err.formatMsg("msg"))
|
||||
return
|
||||
except SnappyError as err:
|
||||
await sendErrorResponse(peer, conn, InvalidRequest, err.msg)
|
||||
return
|
||||
|
||||
if not processingFut.finished:
|
||||
processingFut.cancel()
|
||||
await sendErrorResponse(peer, conn, InvalidRequest,
|
||||
"Request full data not sent in time")
|
||||
return
|
||||
|
||||
try:
|
||||
logReceivedMsg(peer, MsgType(processingFut.read))
|
||||
await callUserHandler(peer, conn, processingFut.read)
|
||||
except CatchableError as err:
|
||||
await sendErrorResponse(peer, conn, ServerError, err.msg)
|
||||
if fs.timeoutToNextByte(TTFB_TIMEOUT):
|
||||
returnInvalidRequest "Request first byte not sent in time"
|
||||
|
||||
fs
|
||||
else:
|
||||
let deadline = sleepAsync RESP_TIMEOUT
|
||||
var msgBytes = await readMsgBytes(conn, false, deadline)
|
||||
# TODO The TTFB timeout is not implemented in LibP2P streams back-end
|
||||
conn
|
||||
|
||||
if msgBytes.len == 0:
|
||||
await sendErrorResponse(peer, conn, ServerError, readTimeoutErrorMsg)
|
||||
return
|
||||
let deadline = sleepAsync RESP_TIMEOUT
|
||||
|
||||
if useSnappy:
|
||||
msgBytes = framingFormatUncompress(msgBytes)
|
||||
let msg = try:
|
||||
awaitWithTimeout(readChunkPayload(s, noSnappy, MsgRec), deadline):
|
||||
returnInvalidRequest "Request full data not sent in time"
|
||||
|
||||
var msg: MsgRec
|
||||
try:
|
||||
msg = decode(SSZ, msgBytes, MsgRec)
|
||||
except SerializationError as err:
|
||||
await sendErrorResponse(peer, conn, InvalidRequest, err.formatMsg("msg"))
|
||||
return
|
||||
except Exception as err:
|
||||
# TODO. This is temporary code that should be removed after interop.
|
||||
# It can be enabled only in certain diagnostic builds where it should
|
||||
# re-raise the exception.
|
||||
debug "Crash during serialization", inputBytes = toHex(msgBytes), msgName
|
||||
await sendErrorResponse(peer, conn, ServerError, err.msg)
|
||||
raise err
|
||||
except SerializationError as err:
|
||||
returnInvalidRequest err.formatMsg("msg")
|
||||
|
||||
try:
|
||||
logReceivedMsg(peer, MsgType(msg))
|
||||
await callUserHandler(peer, conn, msg)
|
||||
except CatchableError as err:
|
||||
await sendErrorResponse(peer, conn, ServerError, err.msg)
|
||||
except SnappyError as err:
|
||||
returnInvalidRequest err.msg
|
||||
|
||||
if msg.isErr:
|
||||
let (responseCode, errMsg) = case msg.error.kind
|
||||
of UnexpectedEOF, PotentiallyExpectedEOF:
|
||||
(InvalidRequest, "Incomplete request")
|
||||
|
||||
of InvalidSnappyBytes:
|
||||
(InvalidRequest, "Failed to decompress snappy payload")
|
||||
|
||||
of InvalidSszBytes:
|
||||
(InvalidRequest, "Failed to decode SSZ payload")
|
||||
|
||||
of ZeroSizePrefix:
|
||||
(InvalidRequest, "The request chunk cannot have a size of zero")
|
||||
|
||||
of SizePrefixOverflow:
|
||||
(InvalidRequest, "The chunk size exceed the maximum allowed")
|
||||
|
||||
of InvalidResponseCode, ReceivedErrorResponse,
|
||||
StreamOpenTimeout, ReadResponseTimeout:
|
||||
# These shouldn't be possible in a request, because
|
||||
# there are no response codes being read, no stream
|
||||
# openings and no reading of responses:
|
||||
(ServerError, "Internal server error")
|
||||
|
||||
of BrokenConnection:
|
||||
return
|
||||
|
||||
await sendErrorResponse(peer, conn, noSnappy, responseCode, errMsg)
|
||||
return
|
||||
|
||||
try:
|
||||
logReceivedMsg(peer, MsgType(msg.get))
|
||||
await callUserHandler(peer, conn, noSnappy, msg.get)
|
||||
except CatchableError as err:
|
||||
await sendErrorResponse(peer, conn, noSnappy, ServerError, err.msg)
|
||||
|
||||
except CatchableError as err:
|
||||
debug "Error processing an incoming request", err = err.msg
|
||||
@ -720,6 +738,7 @@ proc registerMsg(protocol: ProtocolInfo,
|
||||
proc p2pProtocolBackendImpl*(p: P2PProtocol): Backend =
|
||||
var
|
||||
Format = ident "SSZ"
|
||||
Bool = bindSym "bool"
|
||||
Responder = bindSym "Responder"
|
||||
Connection = bindSym "Connection"
|
||||
Peer = bindSym "Peer"
|
||||
@ -729,6 +748,7 @@ proc p2pProtocolBackendImpl*(p: P2PProtocol): Backend =
|
||||
msgVar = ident "msg"
|
||||
networkVar = ident "network"
|
||||
callUserHandler = ident "callUserHandler"
|
||||
noSnappyVar = ident "noSnappy"
|
||||
|
||||
p.useRequestIds = false
|
||||
p.useSingleRecordInlining = true
|
||||
@ -740,6 +760,7 @@ proc p2pProtocolBackendImpl*(p: P2PProtocol): Backend =
|
||||
result.registerProtocol = bindSym "registerProtocol"
|
||||
result.setEventHandlers = bindSym "setEventHandlers"
|
||||
result.SerializationFormat = Format
|
||||
result.RequestResultsWrapper = ident "NetRes"
|
||||
result.ResponderType = Responder
|
||||
|
||||
result.afterProtocolInit = proc (p: P2PProtocol) =
|
||||
@ -758,7 +779,8 @@ proc p2pProtocolBackendImpl*(p: P2PProtocol): Backend =
|
||||
# Request procs need an extra param - the stream where the response
|
||||
# should be written:
|
||||
msg.userHandler.params.insert(2, newIdentDefs(streamVar, Connection))
|
||||
msg.initResponderCall.add streamVar
|
||||
msg.userHandler.params.insert(3, newIdentdefs(noSnappyVar, Bool))
|
||||
msg.initResponderCall.add [streamVar, noSnappyVar]
|
||||
|
||||
##
|
||||
## Implement the Thunk:
|
||||
@ -775,20 +797,22 @@ proc p2pProtocolBackendImpl*(p: P2PProtocol): Backend =
|
||||
##
|
||||
let
|
||||
protocolMounterName = ident(msgName & "_mounter")
|
||||
userHandlerCall = msg.genUserHandlerCall(msgVar, [peerVar, streamVar])
|
||||
userHandlerCall = msg.genUserHandlerCall(
|
||||
msgVar, [peerVar, streamVar, noSnappyVar])
|
||||
|
||||
var mounter: NimNode
|
||||
if msg.userHandler != nil:
|
||||
protocol.outRecvProcs.add quote do:
|
||||
template `callUserHandler`(`peerVar`: `Peer`,
|
||||
`streamVar`: `Connection`,
|
||||
`noSnappyVar`: bool,
|
||||
`msgVar`: `MsgRecName`): untyped =
|
||||
`userHandlerCall`
|
||||
|
||||
proc `protocolMounterName`(`networkVar`: `Eth2Node`) =
|
||||
proc sszThunk(`streamVar`: `Connection`,
|
||||
proto: string): Future[void] {.gcsafe.} =
|
||||
return handleIncomingStream(`networkVar`, `streamVar`, false,
|
||||
return handleIncomingStream(`networkVar`, `streamVar`, true,
|
||||
`MsgStrongRecName`)
|
||||
|
||||
mount `networkVar`.switch,
|
||||
@ -797,7 +821,7 @@ proc p2pProtocolBackendImpl*(p: P2PProtocol): Backend =
|
||||
|
||||
proc snappyThunk(`streamVar`: `Connection`,
|
||||
proto: string): Future[void] {.gcsafe.} =
|
||||
return handleIncomingStream(`networkVar`, `streamVar`, true,
|
||||
return handleIncomingStream(`networkVar`, `streamVar`, false,
|
||||
`MsgStrongRecName`)
|
||||
|
||||
mount `networkVar`.switch,
|
||||
|
@ -49,70 +49,86 @@ func libp2pInput*(conn: Connection,
|
||||
buffers: initPageBuffers(pageSize),
|
||||
conn: conn)
|
||||
|
||||
proc readSizePrefix(s: AsyncInputStream, maxSize: uint64): Future[int] {.async.} =
|
||||
trace "about to read msg size prefix"
|
||||
var parser: VarintParser[uint64, ProtoBuf]
|
||||
proc readSizePrefix(s: AsyncInputStream,
|
||||
maxSize: uint32): Future[NetRes[uint32]] {.async.} =
|
||||
var parser: VarintParser[uint32, ProtoBuf]
|
||||
while s.readable:
|
||||
case parser.feedByte(s.read)
|
||||
of Done:
|
||||
let res = parser.getResult
|
||||
if res > maxSize:
|
||||
trace "size prefix outside of range", res
|
||||
return -1
|
||||
return neterr SizePrefixOverflow
|
||||
else:
|
||||
trace "got size prefix", res
|
||||
return int(res)
|
||||
return ok res
|
||||
of Overflow:
|
||||
trace "size prefix overflow"
|
||||
return -1
|
||||
return neterr SizePrefixOverflow
|
||||
of Incomplete:
|
||||
continue
|
||||
|
||||
proc readSszValue(s: AsyncInputStream, MsgType: type): Future[MsgType] {.async.} =
|
||||
let size = await s.readSizePrefix(uint64(MAX_CHUNK_SIZE))
|
||||
if size > 0 and s.readable(size):
|
||||
return neterr UnexpectedEOF
|
||||
|
||||
proc readSszValue(s: AsyncInputStream,
|
||||
size: int,
|
||||
MsgType: type): Future[NetRes[MsgType]] {.async.} =
|
||||
if s.readable(size):
|
||||
s.withReadableRange(size, r):
|
||||
return r.readValue(SSZ, MsgType)
|
||||
else:
|
||||
raise newException(CatchableError,
|
||||
"Failed to read an incoming message size prefix")
|
||||
return neterr UnexpectedEOF
|
||||
|
||||
proc readResponseCode(s: AsyncInputStream): Future[Result[bool, string]] {.async.} =
|
||||
if s.readable:
|
||||
let responseCode = s.read
|
||||
static: assert responseCode.type.low == 0
|
||||
if responseCode > ResponseCode.high.byte:
|
||||
return err("Invalid response code")
|
||||
proc readChunkPayload(s: AsyncInputStream,
|
||||
noSnappy: bool,
|
||||
MsgType: type): Future[NetRes[MsgType]] {.async.} =
|
||||
let prefix = await readSizePrefix(s, MAX_CHUNK_SIZE)
|
||||
let size = if prefix.isOk: prefix.value.int
|
||||
else: return err(prefix.error)
|
||||
|
||||
case ResponseCode(responseCode):
|
||||
of InvalidRequest, ServerError:
|
||||
return err(await s.readSszValue(string))
|
||||
of Success:
|
||||
return ok true
|
||||
if size > 0:
|
||||
let processingFut = if noSnappy:
|
||||
readSszValue(s, size, MsgType)
|
||||
else:
|
||||
executePipeline(uncompressFramedStream,
|
||||
readSszValue(size, MsgType))
|
||||
|
||||
return await processingFut
|
||||
else:
|
||||
return ok false
|
||||
return neterr ZeroSizePrefix
|
||||
|
||||
proc readChunk(s: AsyncInputStream,
|
||||
MsgType: typedesc): Future[Option[MsgType]] {.async.} =
|
||||
let rc = await s.readResponseCode()
|
||||
if rc.isOk:
|
||||
if rc[]:
|
||||
return some(await readSszValue(s, MsgType))
|
||||
else:
|
||||
trace "Failed to read response code",
|
||||
reason = rc.error
|
||||
proc readResponseChunk(s: AsyncInputStream,
|
||||
noSnappy: bool,
|
||||
MsgType: typedesc): Future[NetRes[MsgType]] {.async.} =
|
||||
let responseCodeByte = s.read
|
||||
|
||||
static: assert ResponseCode.low.ord == 0
|
||||
if responseCodeByte > ResponseCode.high.byte:
|
||||
return neterr InvalidResponseCode
|
||||
|
||||
let responseCode = ResponseCode responseCodeByte
|
||||
case responseCode:
|
||||
of InvalidRequest, ServerError:
|
||||
let errorMsgChunk = await readChunkPayload(s, noSnappy, string)
|
||||
let errorMsg = if errorMsgChunk.isOk: errorMsgChunk.value
|
||||
else: return err(errorMsgChunk.error)
|
||||
return err Eth2NetworkingError(kind: ReceivedErrorResponse,
|
||||
responseCode: responseCode,
|
||||
errorMsg: errorMsg)
|
||||
of Success:
|
||||
discard
|
||||
|
||||
return await readChunkPayload(s, noSnappy, MsgType)
|
||||
|
||||
proc readResponse(s: AsyncInputStream,
|
||||
MsgType: type): Future[Option[MsgType]] {.gcsafe, async.} =
|
||||
noSnappy: bool,
|
||||
MsgType: type): Future[NetRes[MsgType]] {.gcsafe, async.} =
|
||||
when MsgType is seq:
|
||||
type E = ElemType(MsgType)
|
||||
var results: MsgType
|
||||
while true:
|
||||
let nextRes = await s.readChunk(E)
|
||||
if nextRes.isNone: break
|
||||
results.add nextRes.get
|
||||
if results.len > 0:
|
||||
return some(results)
|
||||
while s.readable:
|
||||
results.add(? await s.readResponseChunk(noSnappy, E))
|
||||
return ok results
|
||||
else:
|
||||
return await s.readChunk(MsgType)
|
||||
if s.readable:
|
||||
return await s.readResponseChunk(noSnappy, MsgType)
|
||||
else:
|
||||
return neterr UnexpectedEOF
|
||||
|
||||
|
@ -1,22 +1,26 @@
|
||||
# TODO: How can this be tested?
|
||||
proc uncompressFramedStream*(conn: Connection, output: OutputStream): Future[Result[void, cstring]] {.async.} =
|
||||
proc uncompressFramedStream*(conn: Connection,
|
||||
output: OutputStream,
|
||||
expectedSize: int): Future[Result[void, cstring]]
|
||||
{.async.} =
|
||||
var header: array[STREAM_HEADER.len, byte]
|
||||
try:
|
||||
await conn.readExactly(addr header[0], header.len)
|
||||
except LPStreamEOFError:
|
||||
return err "Unexpected EOF before snappy header"
|
||||
|
||||
if header != STREAM_HEADER.toOpenArrayByte(0, STREAM_HEADER.len-1):
|
||||
if header != STREAM_HEADER.toOpenArrayByte(0, STREAM_HEADER.high):
|
||||
return err "Incorrect snappy header"
|
||||
|
||||
var totalBytesDecompressed = 0
|
||||
var uncompressedData = newSeq[byte](MAX_UNCOMPRESSED_DATA_LEN)
|
||||
|
||||
while true:
|
||||
while totalBytesDecompressed < expectedSize:
|
||||
var frameHeader: array[4, byte]
|
||||
try:
|
||||
await conn.readExactly(addr frameHeader[0], frameHeader.len)
|
||||
except LPStreamEOFError:
|
||||
return ok()
|
||||
break
|
||||
|
||||
let x = uint32.fromBytesLE frameHeader
|
||||
let id = x and 0xFF
|
||||
@ -37,7 +41,7 @@ proc uncompressFramedStream*(conn: Connection, output: OutputStream): Future[Res
|
||||
|
||||
let
|
||||
crc = uint32.fromBytesLE frameData[0..3]
|
||||
uncompressedLen = snappyUncompress(frameData.toOpenArray(4, frameData.len - 1), uncompressedData)
|
||||
uncompressedLen = snappyUncompress(frameData.toOpenArray(4, frameData.high), uncompressedData)
|
||||
|
||||
if uncompressedLen <= 0:
|
||||
return err "Failed to decompress snappy frame"
|
||||
@ -45,14 +49,18 @@ proc uncompressFramedStream*(conn: Connection, output: OutputStream): Future[Res
|
||||
if not checkCrcAndAppend(output, uncompressedData.toOpenArray(0, uncompressedLen-1), crc):
|
||||
return err "Snappy content CRC checksum failed"
|
||||
|
||||
totalBytesDecompressed += uncompressedLen
|
||||
|
||||
elif id == UNCOMPRESSED_DATA_IDENTIFIER:
|
||||
if dataLen < 4:
|
||||
return err "Snappy frame size too low to contain CRC checksum"
|
||||
|
||||
let crc = uint32.fromBytesLE frameData[0..3]
|
||||
if not checkCrcAndAppend(output, frameData.toOpenArray(4, frameData.len - 1), crc):
|
||||
if not checkCrcAndAppend(output, frameData.toOpenArray(4, frameData.high), crc):
|
||||
return err "Snappy content CRC checksum failed"
|
||||
|
||||
totalBytesDecompressed += frameData.len - 4
|
||||
|
||||
elif id < 0x80:
|
||||
# Reserved unskippable chunks (chunk types 0x02-0x7f)
|
||||
# if we encounter this type of chunk, stop decoding
|
||||
@ -64,125 +72,101 @@ proc uncompressFramedStream*(conn: Connection, output: OutputStream): Future[Res
|
||||
# including STREAM_HEADER (0xff) should be skipped
|
||||
continue
|
||||
|
||||
proc readChunk(conn: Connection,
|
||||
MsgType: type,
|
||||
withResponseCode: bool,
|
||||
deadline: Future[void]): Future[Option[MsgType]] {.gcsafe.}
|
||||
return ok()
|
||||
|
||||
proc readSizePrefix(conn: Connection,
|
||||
deadline: Future[void]): Future[int] {.async.} =
|
||||
maxSize: uint32): Future[NetRes[uint32]] {.async.} =
|
||||
trace "about to read msg size prefix"
|
||||
var parser: VarintParser[uint64, ProtoBuf]
|
||||
while true:
|
||||
var nextByte: byte
|
||||
var readNextByte = conn.readExactly(addr nextByte, 1)
|
||||
await readNextByte or deadline
|
||||
if not readNextByte.finished:
|
||||
trace "size prefix byte not received in time"
|
||||
return -1
|
||||
case parser.feedByte(nextByte)
|
||||
of Done:
|
||||
let res = parser.getResult
|
||||
if res > uint64(MAX_CHUNK_SIZE):
|
||||
trace "size prefix outside of range", res
|
||||
return -1
|
||||
var parser: VarintParser[uint32, ProtoBuf]
|
||||
try:
|
||||
while true:
|
||||
var nextByte: byte
|
||||
await conn.readExactly(addr nextByte, 1)
|
||||
case parser.feedByte(nextByte)
|
||||
of Done:
|
||||
let res = parser.getResult
|
||||
if res > maxSize:
|
||||
return neterr SizePrefixOverflow
|
||||
else:
|
||||
return ok res
|
||||
of Overflow:
|
||||
return neterr SizePrefixOverflow
|
||||
of Incomplete:
|
||||
continue
|
||||
except LPStreamEOFError:
|
||||
return neterr UnexpectedEOF
|
||||
|
||||
proc readChunkPayload(conn: Connection,
|
||||
noSnappy: bool,
|
||||
MsgType: type): Future[NetRes[MsgType]] {.async.} =
|
||||
let prefix = await readSizePrefix(conn, MAX_CHUNK_SIZE)
|
||||
let size = if prefix.isOk: prefix.value.int
|
||||
else: return err(prefix.error)
|
||||
|
||||
if size > 0:
|
||||
if noSnappy:
|
||||
var bytes = newSeq[byte](size)
|
||||
await conn.readExactly(addr bytes[0], bytes.len)
|
||||
return ok SSZ.decode(bytes, MsgType)
|
||||
else:
|
||||
var snappyOutput = memoryOutput()
|
||||
let status = await conn.uncompressFramedStream(snappyOutput, size)
|
||||
if status.isOk:
|
||||
var decompressedBytes = snappyOutput.getOutput
|
||||
if decompressedBytes.len != size:
|
||||
return neterr InvalidSnappyBytes
|
||||
else:
|
||||
return ok SSZ.decode(decompressedBytes, MsgType)
|
||||
else:
|
||||
trace "got size prefix", res
|
||||
return int(res)
|
||||
of Overflow:
|
||||
trace "size prefix overflow"
|
||||
return -1
|
||||
of Incomplete:
|
||||
continue
|
||||
|
||||
proc readMsgBytes(conn: Connection,
|
||||
withResponseCode: bool,
|
||||
deadline: Future[void]): Future[Bytes] {.async.} =
|
||||
trace "about to read message bytes", withResponseCode
|
||||
return neterr InvalidSnappyBytes
|
||||
else:
|
||||
return neterr ZeroSizePrefix
|
||||
|
||||
proc readResponseChunk(conn: Connection,
|
||||
noSnappy: bool,
|
||||
MsgType: typedesc): Future[NetRes[MsgType]] {.async.} =
|
||||
try:
|
||||
if withResponseCode:
|
||||
var responseCode: byte
|
||||
trace "about to read response code"
|
||||
var readResponseCode = conn.readExactly(addr responseCode, 1)
|
||||
try:
|
||||
await readResponseCode or deadline
|
||||
except LPStreamEOFError:
|
||||
trace "end of stream received"
|
||||
return
|
||||
var responseCodeByte: byte
|
||||
try:
|
||||
await conn.readExactly(addr responseCodeByte, 1)
|
||||
except LPStreamEOFError:
|
||||
return neterr PotentiallyExpectedEOF
|
||||
|
||||
if not readResponseCode.finished:
|
||||
trace "response code not received in time"
|
||||
return
|
||||
static: assert ResponseCode.low.ord == 0
|
||||
if responseCodeByte > ResponseCode.high.byte:
|
||||
return neterr InvalidResponseCode
|
||||
|
||||
if responseCode > ResponseCode.high.byte:
|
||||
trace "invalid response code", responseCode
|
||||
return
|
||||
let responseCode = ResponseCode responseCodeByte
|
||||
case responseCode:
|
||||
of InvalidRequest, ServerError:
|
||||
let errorMsgChunk = await readChunkPayload(conn, noSnappy, string)
|
||||
let errorMsg = if errorMsgChunk.isOk: errorMsgChunk.value
|
||||
else: return err(errorMsgChunk.error)
|
||||
return err Eth2NetworkingError(kind: ReceivedErrorResponse,
|
||||
responseCode: responseCode,
|
||||
errorMsg: errorMsg)
|
||||
of Success:
|
||||
discard
|
||||
|
||||
logScope: responseCode = ResponseCode(responseCode)
|
||||
trace "got response code"
|
||||
return await readChunkPayload(conn, noSnappy, MsgType)
|
||||
|
||||
case ResponseCode(responseCode)
|
||||
of InvalidRequest, ServerError:
|
||||
let responseErrMsg = await conn.readChunk(string, false, deadline)
|
||||
debug "P2P request resulted in error", responseErrMsg
|
||||
return
|
||||
|
||||
of Success:
|
||||
# The response is OK, the execution continues below
|
||||
discard
|
||||
|
||||
var sizePrefix = await conn.readSizePrefix(deadline)
|
||||
trace "got msg size prefix", sizePrefix
|
||||
|
||||
if sizePrefix == -1:
|
||||
debug "Failed to read an incoming message size prefix", peer = conn.peerId
|
||||
return
|
||||
|
||||
if sizePrefix == 0:
|
||||
debug "Received SSZ with zero size", peer = conn.peerId
|
||||
return
|
||||
|
||||
trace "about to read msg bytes", len = sizePrefix
|
||||
var msgBytes = newSeq[byte](sizePrefix)
|
||||
var readBody = conn.readExactly(addr msgBytes[0], sizePrefix)
|
||||
await readBody or deadline
|
||||
if not readBody.finished:
|
||||
trace "msg bytes not received in time"
|
||||
return
|
||||
|
||||
trace "got message bytes", len = sizePrefix
|
||||
return msgBytes
|
||||
|
||||
except TransportIncompleteError:
|
||||
return @[]
|
||||
|
||||
proc readChunk(conn: Connection,
|
||||
MsgType: type,
|
||||
withResponseCode: bool,
|
||||
deadline: Future[void]): Future[Option[MsgType]] {.gcsafe, async.} =
|
||||
var msgBytes = await conn.readMsgBytes(withResponseCode, deadline)
|
||||
try:
|
||||
if msgBytes.len > 0:
|
||||
return some SSZ.decode(msgBytes, MsgType)
|
||||
except SerializationError as err:
|
||||
debug "Failed to decode a network message",
|
||||
msgBytes, errMsg = err.formatMsg("<msg>")
|
||||
return
|
||||
|
||||
proc readResponse(
|
||||
conn: Connection,
|
||||
MsgType: type,
|
||||
deadline: Future[void]): Future[Option[MsgType]] {.gcsafe, async.} =
|
||||
except LPStreamEOFError:
|
||||
return neterr UnexpectedEOF
|
||||
|
||||
proc readResponse(conn: Connection,
|
||||
noSnappy: bool,
|
||||
MsgType: type): Future[NetRes[MsgType]] {.gcsafe, async.} =
|
||||
when MsgType is seq:
|
||||
type E = ElemType(MsgType)
|
||||
var results: MsgType
|
||||
while true:
|
||||
let nextRes = await conn.readChunk(E, true, deadline)
|
||||
if nextRes.isNone: break
|
||||
results.add nextRes.get
|
||||
if results.len > 0:
|
||||
return some(results)
|
||||
let nextRes = await conn.readResponseChunk(noSnappy, E)
|
||||
if nextRes.isErr:
|
||||
if nextRes.error.kind == PotentiallyExpectedEOF:
|
||||
return ok results
|
||||
return err nextRes.error
|
||||
else:
|
||||
results.add nextRes.value
|
||||
else:
|
||||
return await conn.readChunk(MsgType, true, deadline)
|
||||
return await conn.readResponseChunk(noSnappy, MsgType)
|
||||
|
||||
|
@ -26,7 +26,7 @@ proc fetchAncestorBlocksFromPeer(
|
||||
# blocks starting N positions before this slot number.
|
||||
try:
|
||||
let blocks = await peer.beaconBlocksByRoot([rec.root])
|
||||
if blocks.isSome:
|
||||
if blocks.isOk:
|
||||
for b in blocks.get:
|
||||
responseHandler(b)
|
||||
except CatchableError as err:
|
||||
@ -41,7 +41,7 @@ proc fetchAncestorBlocksFromNetwork(
|
||||
try:
|
||||
peer = await network.peerPool.acquire()
|
||||
let blocks = await peer.beaconBlocksByRoot([rec.root])
|
||||
if blocks.isSome:
|
||||
if blocks.isOk:
|
||||
for b in blocks.get:
|
||||
responseHandler(b)
|
||||
except CatchableError as err:
|
||||
|
@ -1,7 +1,7 @@
|
||||
import chronicles
|
||||
import options, deques, heapqueue, tables, strutils, sequtils
|
||||
import stew/bitseqs, chronos, chronicles
|
||||
import spec/datatypes, spec/digest, peer_pool
|
||||
import spec/datatypes, spec/digest, peer_pool, eth2_network
|
||||
export datatypes, digest, chronos, chronicles
|
||||
|
||||
logScope:
|
||||
@ -64,7 +64,7 @@ type
|
||||
queue: SyncQueue
|
||||
|
||||
SyncManagerError* = object of CatchableError
|
||||
OptionBeaconBlocks* = Option[seq[SignedBeaconBlock]]
|
||||
BeaconBlocksRes* = NetRes[seq[SignedBeaconBlock]]
|
||||
|
||||
proc getShortMap*(req: SyncRequest,
|
||||
data: openarray[SignedBeaconBlock]): string =
|
||||
@ -257,7 +257,7 @@ proc newSyncManager*[A, B](pool: PeerPool[A, B],
|
||||
)
|
||||
|
||||
proc getBlocks*[A, B](man: SyncManager[A, B], peer: A,
|
||||
req: SyncRequest): Future[OptionBeaconBlocks] {.async.} =
|
||||
req: SyncRequest): Future[BeaconBlocksRes] {.async.} =
|
||||
mixin beaconBlocksByRange, getScore, `==`
|
||||
doAssert(not(req.isEmpty()), "Request must not be empty!")
|
||||
debug "Requesting blocks from peer", peer = peer,
|
||||
@ -270,7 +270,7 @@ proc getBlocks*[A, B](man: SyncManager[A, B], peer: A,
|
||||
errMsg = workFut.readError().msg, topics = "syncman"
|
||||
else:
|
||||
let res = workFut.read()
|
||||
if res.isNone():
|
||||
if res.isErr:
|
||||
debug "Error, while reading getBlocks response",
|
||||
peer = peer, slot = req.slot, count = req.count,
|
||||
step = req.step, topics = "syncman"
|
||||
@ -368,7 +368,7 @@ proc syncWorker*[A, B](man: SyncManager[A, B],
|
||||
peer_score = peer.getScore(), topics = "syncman"
|
||||
|
||||
let blocks = await man.getBlocks(peer, req)
|
||||
if blocks.isSome():
|
||||
if blocks.isOk:
|
||||
let data = blocks.get()
|
||||
let smap = getShortMap(req, data)
|
||||
debug "Received blocks on request", blocks_count = len(data),
|
||||
|
@ -95,7 +95,7 @@ p2pProtocol BeaconSync(version = 1,
|
||||
# respond in time due to high CPU load in our single thread.
|
||||
theirStatus = await peer.status(ourStatus, timeout = 60.seconds)
|
||||
|
||||
if theirStatus.isSome:
|
||||
if theirStatus.isOk:
|
||||
await peer.handleStatus(peer.networkState,
|
||||
ourStatus, theirStatus.get())
|
||||
else:
|
||||
@ -193,8 +193,8 @@ proc updateStatus*(peer: Peer): Future[bool] {.async.} =
|
||||
result = false
|
||||
else:
|
||||
let theirStatus = theirFut.read()
|
||||
if theirStatus.isSome():
|
||||
peer.setStatusMsg(theirStatus.get())
|
||||
if theirStatus.isOk:
|
||||
peer.setStatusMsg(theirStatus.get)
|
||||
result = true
|
||||
|
||||
proc hasInitialStatus*(peer: Peer): bool {.inline.} =
|
||||
|
2
vendor/nim-eth
vendored
2
vendor/nim-eth
vendored
@ -1 +1 @@
|
||||
Subproject commit b2afb82b9135a37b1c11974b010a648166578024
|
||||
Subproject commit 67bb54b6a5d6bb29d13fd80eab382cff14a7f341
|
2
vendor/nim-snappy
vendored
2
vendor/nim-snappy
vendored
@ -1 +1 @@
|
||||
Subproject commit 20cc8ce1c26e5fbf36e094062dc103440d1c7c6c
|
||||
Subproject commit b4cd68e27a56dbda2a56d7b90e666b644cfd5be6
|
2
vendor/nim-stew
vendored
2
vendor/nim-stew
vendored
@ -1 +1 @@
|
||||
Subproject commit c500d3dda1f3cb9df0aed8ded83ef58af293cfb1
|
||||
Subproject commit d0f5be4971ad34d115b9749d9fb69bdd2aecf525
|
Loading…
x
Reference in New Issue
Block a user