Eliminate the code duplication in the LibP2P back-ends

This commit is contained in:
Zahary Karadjov 2019-12-09 17:28:57 +02:00 committed by zah
parent 889031453f
commit 98656377a3
3 changed files with 442 additions and 770 deletions

View File

@ -17,6 +17,8 @@ export
p2pProtocol, libp2p_json_serialization, ssz
type
P2PStream = Connection
# TODO Is this really needed?
Eth2Node* = ref object of RootObj
switch*: Switch
@ -41,14 +43,9 @@ type
Disconnecting,
Disconnected
DisconnectionReason* = enum
ClientShutDown
IrrelevantNetwork
FaultOrError
UntypedResponder = object
peer*: Peer
stream*: Connection
stream*: P2PStream
Responder*[MsgType] = distinct UntypedResponder
@ -77,14 +74,17 @@ type
PeerStateInitializer* = proc(peer: Peer): RootRef {.gcsafe.}
NetworkStateInitializer* = proc(network: EthereumNode): RootRef {.gcsafe.}
HandshakeStep* = proc(peer: Peer, stream: Connection): Future[void] {.gcsafe.}
HandshakeStep* = proc(peer: Peer, stream: P2PStream): Future[void] {.gcsafe.}
DisconnectionHandler* = proc(peer: Peer): Future[void] {.gcsafe.}
ThunkProc* = LPProtoHandler
MounterProc* = proc(network: Eth2Node) {.gcsafe.}
MessageContentPrinter* = proc(msg: pointer): string {.gcsafe.}
NextMsgResolver* = proc(msgData: SszReader, future: FutureBase) {.gcsafe.}
Bytes = seq[byte]
DisconnectionReason* = enum
ClientShutDown
IrrelevantNetwork
FaultOrError
PeerDisconnected* = object of CatchableError
reason*: DisconnectionReason
@ -94,15 +94,23 @@ type
template `$`*(peer: Peer): string = id(peer.info)
chronicles.formatIt(Peer): $it
include eth/p2p/p2p_backends_helpers
include eth/p2p/p2p_tracing
include libp2p_backends_common
# TODO: This exists only as a compatibility layer between the daemon
# APIs and the native LibP2P ones. It won't be necessary once the
# daemon is removed.
#
template writeAllBytes(stream: P2PStream, bytes: seq[byte]): untyped =
write(stream, bytes)
proc init*(T: type Peer, network: Eth2Node, info: PeerInfo): Peer {.gcsafe.}
template openStream(node: Eth2Node, peer: Peer, protocolId: string): untyped =
dial(node.switch, peer.info, protocolId)
proc peer(stream: Connection): PeerID =
proc peer(stream: P2PStream): PeerID =
# TODO: Can this be `none`?
stream.peerInfo.get.peerId
#
# End of compatibility layer
proc init*(T: type Peer, network: Eth2Node, info: PeerInfo): Peer {.gcsafe.}
proc getPeer*(node: Eth2Node, peerInfo: PeerInfo): Peer {.gcsafe.} =
let peerId = peerInfo.peerId
@ -111,13 +119,9 @@ proc getPeer*(node: Eth2Node, peerInfo: PeerInfo): Peer {.gcsafe.} =
result = Peer.init(node, peerInfo)
node.peers[peerId] = result
proc peerFromStream(network: Eth2Node, connection: Connection): Peer {.gcsafe.} =
proc peerFromStream(network: Eth2Node, stream: P2PStream): Peer {.gcsafe.} =
# TODO: Can this be `none`?
return network.getPeer(connection.peerInfo.get)
proc safeClose(connection: Connection) {.async.} =
if not connection.closed:
await close(connection)
return network.getPeer(stream.peerInfo.get)
proc disconnect*(peer: Peer, reason: DisconnectionReason, notifyOtherPeer = false) {.async.} =
# TODO: How should we notify the other peer?
@ -127,36 +131,13 @@ proc disconnect*(peer: Peer, reason: DisconnectionReason, notifyOtherPeer = fals
peer.connectionState = Disconnected
peer.network.peers.del(peer.info.peerId)
template raisePeerDisconnected(msg: string, r: DisconnectionReason) =
var e = newException(PeerDisconnected, msg)
e.reason = r
raise e
proc safeClose(stream: P2PStream) {.async.} =
if not stream.closed:
await close(stream)
proc disconnectAndRaise(peer: Peer,
reason: DisconnectionReason,
msg: string) {.async.} =
let r = reason
await peer.disconnect(r)
raisePeerDisconnected(msg, r)
template reraiseAsPeerDisconnected(peer: Peer, errMsgExpr: static string,
reason = FaultOrError): auto =
const errMsg = errMsgExpr
debug errMsg
disconnectAndRaise(peer, reason, errMsg)
proc registerProtocol(protocol: ProtocolInfo) =
# TODO: This can be done at compile-time in the future
let pos = lowerBound(gProtocols, protocol)
gProtocols.insert(protocol, pos)
for i in 0 ..< gProtocols.len:
gProtocols[i].index = i
proc setEventHandlers(p: ProtocolInfo,
handshake: HandshakeStep,
disconnectHandler: DisconnectionHandler) =
p.handshake = handshake
p.disconnectHandler = disconnectHandler
include eth/p2p/p2p_backends_helpers
include eth/p2p/p2p_tracing
include libp2p_backends_common
proc init*(T: type Eth2Node, switch: Switch): T =
new result
@ -175,215 +156,6 @@ proc init*(T: type Eth2Node, switch: Switch): T =
proc start*(node: Eth2Node) {.async.} =
node.libp2pTransportLoops = await node.switch.start()
proc readChunk(stream: Connection,
MsgType: type,
withResponseCode: bool,
deadline: Future[void]): Future[Option[MsgType]] {.gcsafe.}
proc readSizePrefix(stream: Connection,
deadline: Future[void]): Future[int] {.async.} =
var parser: VarintParser[uint64, ProtoBuf]
while true:
var nextByte: byte
var readNextByte = stream.readExactly(addr nextByte, 1)
await readNextByte or deadline
if not readNextByte.finished:
return -1
case parser.feedByte(nextByte)
of Done:
let res = parser.getResult
if res > uint64(REQ_RESP_MAX_SIZE):
return -1
else:
return int(res)
of Overflow:
return -1
of Incomplete:
continue
proc readMsgBytes(stream: Connection,
withResponseCode: bool,
deadline: Future[void]): Future[Bytes] {.async.} =
try:
if withResponseCode:
var responseCode: byte
var readResponseCode = stream.readExactly(addr responseCode, 1)
await readResponseCode or deadline
if not readResponseCode.finished:
return
if responseCode > ResponseCode.high.byte: return
logScope: responseCode = ResponseCode(responseCode)
case ResponseCode(responseCode)
of InvalidRequest, ServerError:
let responseErrMsg = await readChunk(stream, string, false, deadline)
debug "P2P request resulted in error", responseErrMsg
return
of Success:
# The response is OK, the execution continues below
discard
var sizePrefix = await readSizePrefix(stream, deadline)
if sizePrefix == -1:
debug "Failed to read an incoming message size prefix", peer = stream.peer
return
if sizePrefix == 0:
debug "Received SSZ with zero size", peer = stream.peer
return
var msgBytes = newSeq[byte](sizePrefix)
var readBody = stream.readExactly(addr msgBytes[0], sizePrefix)
await readBody or deadline
if not readBody.finished: return
return msgBytes
except TransportIncompleteError:
return @[]
proc readChunk(stream: Connection,
MsgType: type,
withResponseCode: bool,
deadline: Future[void]): Future[Option[MsgType]] {.gcsafe, async.} =
var msgBytes = await stream.readMsgBytes(withResponseCode, deadline)
try:
if msgBytes.len > 0:
return some SSZ.decode(msgBytes, MsgType)
except SerializationError as err:
debug "Failed to decode a network message",
msgBytes, errMsg = err.formatMsg("<msg>")
return
proc readResponse(
stream: Connection,
MsgType: type,
deadline: Future[void]): Future[Option[MsgType]] {.gcsafe, async.} =
when MsgType is seq:
type E = ElemType(MsgType)
var results: MsgType
while true:
let nextRes = await readChunk(stream, E, true, deadline)
if nextRes.isNone: break
results.add nextRes.get
if results.len > 0:
return some(results)
else:
return await readChunk(stream, MsgType, true, deadline)
proc encodeErrorMsg(responseCode: ResponseCode, errMsg: string): Bytes =
var s = init OutputStream
s.append byte(responseCode)
s.appendVarint errMsg.len
s.appendValue SSZ, errMsg
s.getOutput
template writeAllBytes(stream: Connection, bytes: seq[byte]): untyped =
# TODO: This exists only as a compatibility layer between the daemon
# APIs and the native LibP2P ones. It won't be necessary once the
# daemon is removed.
stream.write(bytes)
proc sendErrorResponse(peer: Peer,
stream: Connection,
err: ref SerializationError,
msgName: string,
msgBytes: Bytes) {.async.} =
debug "Received an invalid request",
peer, msgName, msgBytes, errMsg = err.formatMsg("<msg>")
let responseBytes = encodeErrorMsg(InvalidRequest, err.formatMsg("msg"))
await stream.writeAllBytes(responseBytes)
await stream.close()
proc sendErrorResponse(peer: Peer,
stream: Connection,
responseCode: ResponseCode,
errMsg: string) {.async.} =
debug "Error processing request", peer, responseCode, errMsg
let responseBytes = encodeErrorMsg(ServerError, errMsg)
await stream.writeAllBytes(responseBytes)
await stream.close()
proc sendNotificationMsg(peer: Peer, protocolId: string, requestBytes: Bytes) {.async} =
var deadline = sleepAsync RESP_TIMEOUT
var streamFut = peer.network.switch.dial(peer.info, protocolId)
await streamFut or deadline
if not streamFut.finished:
# TODO: we are returning here because the deadline passed, but
# the stream can still be opened eventually a bit later. Who is
# going to close it then?
raise newException(TransmissionError, "Failed to open LibP2P stream")
let stream = streamFut.read
defer:
await safeClose(stream)
var s = init OutputStream
s.appendVarint requestBytes.len.uint64
s.append requestBytes
let bytes = s.getOutput
await stream.writeAllBytes(bytes)
# TODO There is too much duplication in the responder functions, but
# I hope to reduce this when I increse the reliance on output streams.
proc sendResponseChunkBytes(responder: UntypedResponder, payload: Bytes) {.async.} =
var s = init OutputStream
s.append byte(Success)
s.appendVarint payload.len.uint64
s.append payload
let bytes = s.getOutput
await responder.stream.writeAllBytes(bytes)
proc sendResponseChunkObj(responder: UntypedResponder, val: auto) {.async.} =
var s = init OutputStream
s.append byte(Success)
s.appendValue SSZ, sizePrefixed(val)
let bytes = s.getOutput
await responder.stream.writeAllBytes(bytes)
proc sendResponseChunks[T](responder: UntypedResponder, chunks: seq[T]) {.async.} =
var s = init OutputStream
for chunk in chunks:
s.append byte(Success)
s.appendValue SSZ, sizePrefixed(chunk)
let bytes = s.getOutput
await responder.stream.writeAllBytes(bytes)
proc makeEth2Request(peer: Peer, protocolId: string, requestBytes: Bytes,
ResponseMsg: type,
timeout: Duration): Future[Option[ResponseMsg]] {.gcsafe, async.} =
var deadline = sleepAsync timeout
# Open a new LibP2P stream
var streamFut = peer.network.switch.dial(peer.info, protocolId)
await streamFut or deadline
if not streamFut.finished:
# TODO: we are returning here because the deadline passed, but
# the stream can still be opened eventually a bit later. Who is
# going to close it then?
return none(ResponseMsg)
let stream = streamFut.read
defer:
await safeClose(stream)
# Send the request
var s = init OutputStream
s.appendVarint requestBytes.len.uint64
s.append requestBytes
let bytes = s.getOutput
await stream.writeAllBytes(bytes)
# Read the response
return await stream.readResponse(ResponseMsg, deadline)
proc p2pStreamName(MsgType: type): string =
mixin msgProtocol, protocolInfo, msgId
MsgType.msgProtocol.protocolInfo.messages[MsgType.msgId].libp2pCodecName
proc init*(T: type Peer, network: Eth2Node, info: PeerInfo): Peer =
new result
result.info = info
@ -396,25 +168,6 @@ proc init*(T: type Peer, network: Eth2Node, info: PeerInfo): Peer =
if proto.peerStateInitializer != nil:
result.protocolStates[i] = proto.peerStateInitializer(result)
proc performProtocolHandshakes*(peer: Peer) {.async.} =
var subProtocolsHandshakes = newSeqOfCap[Future[void]](allProtocols.len)
for protocol in allProtocols:
if protocol.handshake != nil:
subProtocolsHandshakes.add((protocol.handshake)(peer, nil))
await all(subProtocolsHandshakes)
template initializeConnection*(peer: Peer): auto =
performProtocolHandshakes(peer)
proc initProtocol(name: string,
peerInit: PeerStateInitializer,
networkInit: NetworkStateInitializer): ProtocolInfoObj =
result.name = name
result.messages = @[]
result.peerStateInitializer = peerInit
result.networkStateInitializer = networkInit
proc registerMsg(protocol: ProtocolInfo,
name: string,
mounter: MounterProc,
@ -425,91 +178,11 @@ proc registerMsg(protocol: ProtocolInfo,
libp2pCodecName: libp2pCodecName,
printer: printer)
proc init*[MsgType](T: type Responder[MsgType],
peer: Peer, stream: Connection): T =
T(UntypedResponder(peer: peer, stream: stream))
template write*[M](r: var Responder[M], val: auto): auto =
mixin send
type Msg = M
type MsgRec = RecType(Msg)
when MsgRec is seq|openarray:
type E = ElemType(MsgRec)
when val is E:
sendResponseChunkObj(UntypedResponder(r), val)
elif val is MsgRec:
sendResponseChunks(UntypedResponder(r), val)
else:
{.fatal: "Unepected message type".}
else:
send(r, val)
proc implementSendProcBody(sendProc: SendProc) =
let
msg = sendProc.msg
UntypedResponder = bindSym "UntypedResponder"
await = ident "await"
proc sendCallGenerator(peer, bytes: NimNode): NimNode =
if msg.kind != msgResponse:
let msgProto = getRequestProtoName(msg.procDef)
case msg.kind
of msgRequest:
let
timeout = msg.timeoutParam[0]
ResponseRecord = msg.response.recName
quote:
makeEth2Request(`peer`, `msgProto`, `bytes`,
`ResponseRecord`, `timeout`)
else:
quote: sendNotificationMsg(`peer`, `msgProto`, `bytes`)
else:
quote: sendResponseChunkBytes(`UntypedResponder`(`peer`), `bytes`)
sendProc.useStandardBody(nil, nil, sendCallGenerator)
proc handleIncomingStream(network: Eth2Node, stream: Connection,
MsgType, Format: distinct type) {.async, gcsafe.} =
mixin callUserHandler
const msgName = typetraits.name(MsgType)
defer:
await safeClose(stream)
let
deadline = sleepAsync RESP_TIMEOUT
msgBytes = await readMsgBytes(stream, false, deadline)
peer = peerFromStream(network, stream)
if msgBytes.len == 0:
await sendErrorResponse(peer, stream, ServerError, readTimeoutErrorMsg)
return
var msg: MsgType
try:
msg = decode(Format, msgBytes, MsgType)
except SerializationError as err:
await sendErrorResponse(peer, stream, err, msgName, msgBytes)
return
except Exception as err:
# TODO. This is temporary code that should be removed after interop.
# It can be enabled only in certain diagnostic builds where it should
# re-raise the exception.
debug "Crash during serialization", inputBytes = toHex(msgBytes), msgName
await sendErrorResponse(peer, stream, ServerError, err.msg)
raise err
try:
logReceivedMsg(peer, msg)
await callUserHandler(peer, stream, msg)
except CatchableError as err:
await sendErrorResponse(peer, stream, ServerError, err.msg)
proc p2pProtocolBackendImpl*(p: P2PProtocol): Backend =
var
Format = ident "SSZ"
Responder = bindSym "Responder"
Connection = bindSym "Connection"
P2PStream = bindSym "P2PStream"
OutputStream = bindSym "OutputStream"
Peer = bindSym "Peer"
Eth2Node = bindSym "Eth2Node"
@ -538,7 +211,7 @@ proc p2pProtocolBackendImpl*(p: P2PProtocol): Backend =
result.ResponderType = Responder
result.afterProtocolInit = proc (p: P2PProtocol) =
p.onPeerConnected.params.add newIdentDefs(streamVar, Connection)
p.onPeerConnected.params.add newIdentDefs(streamVar, P2PStream)
result.implementMsg = proc (msg: Message) =
let
@ -551,13 +224,13 @@ proc p2pProtocolBackendImpl*(p: P2PProtocol): Backend =
if msg.procDef.body.kind != nnkEmpty and msg.kind == msgRequest:
# Request procs need an extra param - the stream where the response
# should be written:
msg.userHandler.params.insert(2, newIdentDefs(streamVar, Connection))
msg.userHandler.params.insert(2, newIdentDefs(streamVar, P2PStream))
msg.initResponderCall.add streamVar
##
## Implement the Thunk:
##
## The protocol handlers in nim-libp2p receive only a `Connection`
## The protocol handlers in nim-libp2p receive only a `P2PStream`
## parameter and there is no way to access the wider context (such
## as the current `Switch`). In our handlers, we may need to list all
## peers in the current network, so we must keep a reference to the
@ -571,21 +244,16 @@ proc p2pProtocolBackendImpl*(p: P2PProtocol): Backend =
protocolMounterName = ident(msgName & "_mounter")
userHandlerCall = msg.genUserHandlerCall(msgVar, [peerVar, streamVar])
let tracing = when tracingEnabled:
quote: logReceivedMsg(`streamVar`.peer, `msgVar`.get)
else:
newStmtList()
var mounter: NimNode
if msg.userHandler != nil:
protocol.outRecvProcs.add quote do:
template `callUserHandler`(`peerVar`: `Peer`,
`streamVar`: `Connection`,
`streamVar`: `P2PStream`,
`msgVar`: `MsgRecName`): untyped =
`userHandlerCall`
proc `protocolMounterName`(`networkVar`: `Eth2Node`) =
proc thunk(`streamVar`: `Connection`,
proc thunk(`streamVar`: `P2PStream`,
proto: string): Future[void] {.gcsafe.} =
return handleIncomingStream(`networkVar`, `streamVar`,
`MsgRecName`, `Format`)

View File

@ -4,6 +4,8 @@ type
InvalidRequest
ServerError
Bytes = seq[byte]
const
defaultIncomingReqTimeout = 5000
HandshakeTimeout = FaultOrError
@ -18,7 +20,7 @@ const
readTimeoutErrorMsg = "Exceeded read timeout for a request"
logScope:
topic = "libp2p"
topics = "libp2p"
template libp2pProtocol*(name: string, version: int) {.pragma.}
@ -36,3 +38,357 @@ proc getRequestProtoName(fn: NimNode): NimNode =
return newLit("")
template raisePeerDisconnected(msg: string, r: DisconnectionReason) =
var e = newException(PeerDisconnected, msg)
e.reason = r
raise e
proc disconnectAndRaise(peer: Peer,
reason: DisconnectionReason,
msg: string) {.async.} =
let r = reason
await peer.disconnect(r)
raisePeerDisconnected(msg, r)
proc readChunk(stream: P2PStream,
MsgType: type,
withResponseCode: bool,
deadline: Future[void]): Future[Option[MsgType]] {.gcsafe.}
proc readSizePrefix(stream: P2PStream,
deadline: Future[void]): Future[int] {.async.} =
trace "about to read msg size prefix"
var parser: VarintParser[uint64, ProtoBuf]
while true:
var nextByte: byte
var readNextByte = stream.readExactly(addr nextByte, 1)
await readNextByte or deadline
if not readNextByte.finished:
trace "size prefix byte not received in time"
return -1
case parser.feedByte(nextByte)
of Done:
let res = parser.getResult
if res > uint64(REQ_RESP_MAX_SIZE):
trace "size prefix outside of range", res
return -1
else:
trace "got size prefix", res
return int(res)
of Overflow:
trace "size prefix overflow"
return -1
of Incomplete:
continue
proc readMsgBytes(stream: P2PStream,
withResponseCode: bool,
deadline: Future[void]): Future[Bytes] {.async.} =
trace "about to read message bytes", withResponseCode
try:
if withResponseCode:
var responseCode: byte
trace "about to read response code"
var readResponseCode = stream.readExactly(addr responseCode, 1)
await readResponseCode or deadline
if not readResponseCode.finished:
trace "response code not received in time"
return
if responseCode > ResponseCode.high.byte:
trace "invalid response code", responseCode
return
logScope: responseCode = ResponseCode(responseCode)
trace "got response code"
case ResponseCode(responseCode)
of InvalidRequest, ServerError:
let responseErrMsg = await readChunk(stream, string, false, deadline)
debug "P2P request resulted in error", responseErrMsg
return
of Success:
# The response is OK, the execution continues below
discard
var sizePrefix = await readSizePrefix(stream, deadline)
trace "got msg size prefix", sizePrefix
if sizePrefix == -1:
debug "Failed to read an incoming message size prefix", peer = stream.peer
return
if sizePrefix == 0:
debug "Received SSZ with zero size", peer = stream.peer
return
trace "about to read msg bytes"
var msgBytes = newSeq[byte](sizePrefix)
var readBody = stream.readExactly(addr msgBytes[0], sizePrefix)
await readBody or deadline
if not readBody.finished:
trace "msg bytes not received in time"
return
trace "got message bytes", msgBytes
return msgBytes
except TransportIncompleteError:
return @[]
proc readChunk(stream: P2PStream,
MsgType: type,
withResponseCode: bool,
deadline: Future[void]): Future[Option[MsgType]] {.gcsafe, async.} =
var msgBytes = await stream.readMsgBytes(withResponseCode, deadline)
try:
if msgBytes.len > 0:
return some SSZ.decode(msgBytes, MsgType)
except SerializationError as err:
debug "Failed to decode a network message",
msgBytes, errMsg = err.formatMsg("<msg>")
return
proc readResponse(
stream: P2PStream,
MsgType: type,
deadline: Future[void]): Future[Option[MsgType]] {.gcsafe, async.} =
when MsgType is seq:
type E = ElemType(MsgType)
var results: MsgType
while true:
let nextRes = await readChunk(stream, E, true, deadline)
if nextRes.isNone: break
results.add nextRes.get
if results.len > 0:
return some(results)
else:
return await readChunk(stream, MsgType, true, deadline)
proc encodeErrorMsg(responseCode: ResponseCode, errMsg: string): Bytes =
var s = init OutputStream
s.append byte(responseCode)
s.appendVarint errMsg.len
s.appendValue SSZ, errMsg
s.getOutput
proc sendErrorResponse(peer: Peer,
stream: P2PStream,
err: ref SerializationError,
msgName: string,
msgBytes: Bytes) {.async.} =
debug "Received an invalid request",
peer, msgName, msgBytes, errMsg = err.formatMsg("<msg>")
let responseBytes = encodeErrorMsg(InvalidRequest, err.formatMsg("msg"))
await stream.writeAllBytes(responseBytes)
await stream.close()
proc sendErrorResponse(peer: Peer,
stream: P2PStream,
responseCode: ResponseCode,
errMsg: string) {.async.} =
debug "Error processing request", peer, responseCode, errMsg
let responseBytes = encodeErrorMsg(ServerError, errMsg)
await stream.writeAllBytes(responseBytes)
await stream.close()
proc sendNotificationMsg(peer: Peer, protocolId: string, requestBytes: Bytes) {.async} =
var deadline = sleepAsync RESP_TIMEOUT
var streamFut = peer.network.openStream(peer, protocolId)
await streamFut or deadline
if not streamFut.finished:
# TODO: we are returning here because the deadline passed, but
# the stream can still be opened eventually a bit later. Who is
# going to close it then?
raise newException(TransmissionError, "Failed to open LibP2P stream")
let stream = streamFut.read
defer:
await safeClose(stream)
var s = init OutputStream
s.appendVarint requestBytes.len.uint64
s.append requestBytes
let bytes = s.getOutput
await stream.writeAllBytes(bytes)
# TODO There is too much duplication in the responder functions, but
# I hope to reduce this when I increse the reliance on output streams.
proc sendResponseChunkBytes(responder: UntypedResponder, payload: Bytes) {.async.} =
var s = init OutputStream
s.append byte(Success)
s.appendVarint payload.len.uint64
s.append payload
let bytes = s.getOutput
await responder.stream.writeAllBytes(bytes)
proc sendResponseChunkObj(responder: UntypedResponder, val: auto) {.async.} =
var s = init OutputStream
s.append byte(Success)
s.appendValue SSZ, sizePrefixed(val)
let bytes = s.getOutput
await responder.stream.writeAllBytes(bytes)
proc sendResponseChunks[T](responder: UntypedResponder, chunks: seq[T]) {.async.} =
var s = init OutputStream
for chunk in chunks:
s.append byte(Success)
s.appendValue SSZ, sizePrefixed(chunk)
let bytes = s.getOutput
await responder.stream.writeAllBytes(bytes)
proc makeEth2Request(peer: Peer, protocolId: string, requestBytes: Bytes,
ResponseMsg: type,
timeout: Duration): Future[Option[ResponseMsg]] {.gcsafe, async.} =
var deadline = sleepAsync timeout
# Open a new LibP2P stream
var streamFut = peer.network.openStream(peer, protocolId)
await streamFut or deadline
if not streamFut.finished:
# TODO: we are returning here because the deadline passed, but
# the stream can still be opened eventually a bit later. Who is
# going to close it then?
return none(ResponseMsg)
let stream = streamFut.read
defer:
await safeClose(stream)
# Send the request
var s = init OutputStream
s.appendVarint requestBytes.len.uint64
s.append requestBytes
let bytes = s.getOutput
await stream.writeAllBytes(bytes)
# Read the response
return await stream.readResponse(ResponseMsg, deadline)
proc init*[MsgType](T: type Responder[MsgType],
peer: Peer, stream: P2PStream): T =
T(UntypedResponder(peer: peer, stream: stream))
template write*[M](r: var Responder[M], val: auto): auto =
mixin send
type Msg = M
type MsgRec = RecType(Msg)
when MsgRec is seq|openarray:
type E = ElemType(MsgRec)
when val is E:
sendResponseChunkObj(UntypedResponder(r), val)
elif val is MsgRec:
sendResponseChunks(UntypedResponder(r), val)
else:
{.fatal: "Unepected message type".}
else:
send(r, val)
proc performProtocolHandshakes*(peer: Peer) {.async.} =
var subProtocolsHandshakes = newSeqOfCap[Future[void]](allProtocols.len)
for protocol in allProtocols:
if protocol.handshake != nil:
subProtocolsHandshakes.add((protocol.handshake)(peer, nil))
await all(subProtocolsHandshakes)
template initializeConnection*(peer: Peer): auto =
performProtocolHandshakes(peer)
proc initProtocol(name: string,
peerInit: PeerStateInitializer,
networkInit: NetworkStateInitializer): ProtocolInfoObj =
result.name = name
result.messages = @[]
result.peerStateInitializer = peerInit
result.networkStateInitializer = networkInit
proc registerProtocol(protocol: ProtocolInfo) =
# TODO: This can be done at compile-time in the future
let pos = lowerBound(gProtocols, protocol)
gProtocols.insert(protocol, pos)
for i in 0 ..< gProtocols.len:
gProtocols[i].index = i
proc setEventHandlers(p: ProtocolInfo,
handshake: HandshakeStep,
disconnectHandler: DisconnectionHandler) =
p.handshake = handshake
p.disconnectHandler = disconnectHandler
proc implementSendProcBody(sendProc: SendProc) =
let
msg = sendProc.msg
UntypedResponder = bindSym "UntypedResponder"
await = ident "await"
proc sendCallGenerator(peer, bytes: NimNode): NimNode =
if msg.kind != msgResponse:
let msgProto = getRequestProtoName(msg.procDef)
case msg.kind
of msgRequest:
let
timeout = msg.timeoutParam[0]
ResponseRecord = msg.response.recName
quote:
makeEth2Request(`peer`, `msgProto`, `bytes`,
`ResponseRecord`, `timeout`)
else:
quote: sendNotificationMsg(`peer`, `msgProto`, `bytes`)
else:
quote: sendResponseChunkBytes(`UntypedResponder`(`peer`), `bytes`)
sendProc.useStandardBody(nil, nil, sendCallGenerator)
proc handleIncomingStream(network: Eth2Node, stream: P2PStream,
MsgType, Format: distinct type) {.async, gcsafe.} =
mixin callUserHandler
const msgName = typetraits.name(MsgType)
## Uncomment this to enable tracing on all incoming requests
## You can include `msgNameLit` in the condition to select
## more specific requests:
# when chronicles.runtimeFilteringEnabled:
# setLogLevel(LogLevel.TRACE)
# defer: setLogLevel(LogLevel.DEBUG)
# trace "incoming " & `msgNameLit` & " stream"
defer:
await safeClose(stream)
let
deadline = sleepAsync RESP_TIMEOUT
msgBytes = await readMsgBytes(stream, false, deadline)
peer = peerFromStream(network, stream)
if msgBytes.len == 0:
await sendErrorResponse(peer, stream, ServerError, readTimeoutErrorMsg)
return
var msg: MsgType
try:
msg = decode(Format, msgBytes, MsgType)
except SerializationError as err:
await sendErrorResponse(peer, stream, err, msgName, msgBytes)
return
except Exception as err:
# TODO. This is temporary code that should be removed after interop.
# It can be enabled only in certain diagnostic builds where it should
# re-raise the exception.
debug "Crash during serialization", inputBytes = toHex(msgBytes), msgName
await sendErrorResponse(peer, stream, ServerError, err.msg)
raise err
try:
logReceivedMsg(peer, msg)
await callUserHandler(peer, stream, msg)
except CatchableError as err:
await sendErrorResponse(peer, stream, ServerError, err.msg)

View File

@ -1,5 +1,5 @@
import
algorithm,
algorithm, typetraits,
stew/varints, stew/shims/[macros, tables], chronos, chronicles,
libp2p/daemon/daemonapi, faststreams/output_stream, serialization,
json_serialization/std/options, eth/p2p/p2p_protocol_dsl,
@ -31,11 +31,6 @@ type
Disconnecting,
Disconnected
DisconnectionReason* = enum
ClientShutDown
IrrelevantNetwork
FaultOrError
UntypedResponder = object
peer*: Peer
stream*: P2PStream
@ -73,22 +68,36 @@ type
MessageContentPrinter* = proc(msg: pointer): string {.gcsafe.}
NextMsgResolver* = proc(msgData: SszReader, future: FutureBase) {.gcsafe.}
Bytes = seq[byte]
DisconnectionReason* = enum
ClientShutDown
IrrelevantNetwork
FaultOrError
PeerDisconnected* = object of CatchableError
reason*: DisconnectionReason
TransmissionError* = object of CatchableError
logScope:
topic = "libp2p"
template `$`*(peer: Peer): string = $peer.id
chronicles.formatIt(Peer): $it
include eth/p2p/p2p_backends_helpers
include eth/p2p/p2p_tracing
include libp2p_backends_common
# TODO: These exists only as a compatibility layer between the daemon
# APIs and the native LibP2P ones. It won't be necessary once the
# daemon is removed.
#
proc writeAllBytes(stream: P2PStream, bytes: seq[byte]) {.async.} =
let sent = await stream.transp.write(bytes)
if sent != bytes.len:
raise newException(TransmissionError, "Failed to deliver msg bytes")
template readExactly(stream: P2PStream, dst: pointer, dstLen: int): untyped =
readExactly(stream.transp, dst, dstLen)
template openStream(node: Eth2Node, peer: Peer, protocolId: string): untyped =
openStream(node.daemon, peer.id, @[protocolId])
#
# End of compatibility layer
proc init*(T: type Peer, network: Eth2Node, id: PeerID): Peer {.gcsafe.}
@ -99,14 +108,10 @@ proc getPeer*(node: Eth2Node, peerId: PeerID): Peer {.gcsafe.} =
node.peers[peerId] = result
proc getPeer*(node: Eth2Node, peerInfo: PeerInfo): Peer =
getPeer(node, peerInfo.peer)
node.getPeer(peerInfo.peer)
proc peerFromStream(daemon: DaemonAPI, stream: P2PStream): Peer {.gcsafe.} =
Eth2Node(daemon.userData).getPeer(stream.peer)
proc safeClose(stream: P2PStream) {.async.} =
if P2PStreamFlags.Closed notin stream.flags:
await close(stream)
proc peerFromStream(node: Eth2Node, stream: P2PStream): Peer {.gcsafe.} =
node.getPeer(stream.peer)
proc disconnect*(peer: Peer, reason: DisconnectionReason, notifyOtherPeer = false) {.async.} =
# TODO: How should we notify the other peer?
@ -116,30 +121,13 @@ proc disconnect*(peer: Peer, reason: DisconnectionReason, notifyOtherPeer = fals
peer.connectionState = Disconnected
peer.network.peers.del(peer.id)
template raisePeerDisconnected(msg: string, r: DisconnectionReason) =
var e = newException(PeerDisconnected, msg)
e.reason = r
raise e
proc safeClose(stream: P2PStream) {.async.} =
if P2PStreamFlags.Closed notin stream.flags:
await close(stream)
proc disconnectAndRaise(peer: Peer,
reason: DisconnectionReason,
msg: string) {.async.} =
let r = reason
await peer.disconnect(r)
raisePeerDisconnected(msg, r)
proc registerProtocol(protocol: ProtocolInfo) =
# TODO: This can be done at compile-time in the future
let pos = lowerBound(gProtocols, protocol)
gProtocols.insert(protocol, pos)
for i in 0 ..< gProtocols.len:
gProtocols[i].index = i
proc setEventHandlers(p: ProtocolInfo,
handshake: HandshakeStep,
disconnectHandler: DisconnectionHandler) =
p.handshake = handshake
p.disconnectHandler = disconnectHandler
include eth/p2p/p2p_backends_helpers
include eth/p2p/p2p_tracing
include libp2p_backends_common
proc init*(T: type Eth2Node, daemon: DaemonAPI): Future[T] {.async.} =
new result
@ -153,243 +141,9 @@ proc init*(T: type Eth2Node, daemon: DaemonAPI): Future[T] {.async.} =
result.protocolStates[proto.index] = proto.networkStateInitializer(result)
for msg in proto.messages:
if msg.libp2pProtocol.len > 0:
if msg.libp2pProtocol.len > 0 and msg.thunk != nil:
await daemon.addHandler(@[msg.libp2pProtocol], msg.thunk)
proc readChunk(stream: P2PStream,
MsgType: type,
withResponseCode: bool,
deadline: Future[void]): Future[Option[MsgType]] {.gcsafe.}
proc readSizePrefix(transp: StreamTransport,
deadline: Future[void]): Future[int] {.async.} =
trace "about to read msg size prefix"
var parser: VarintParser[uint64, ProtoBuf]
while true:
var nextByte: byte
var readNextByte = transp.readExactly(addr nextByte, 1)
await readNextByte or deadline
if not readNextByte.finished:
trace "size prefix byte not received in time"
return -1
case parser.feedByte(nextByte)
of Done:
let res = parser.getResult
if res > uint64(REQ_RESP_MAX_SIZE):
trace "size prefix outside of range", res
return -1
else:
trace "got size prefix", res
return int(res)
of Overflow:
trace "size prefix overflow"
return -1
of Incomplete:
continue
proc readMsgBytes(stream: P2PStream,
withResponseCode: bool,
deadline: Future[void]): Future[Bytes] {.async.} =
trace "about to read message bytes", withResponseCode
try:
if withResponseCode:
var responseCode: byte
trace "about to read response code"
var readResponseCode = stream.transp.readExactly(addr responseCode, 1)
await readResponseCode or deadline
if not readResponseCode.finished:
trace "response code not received in time"
return
if responseCode > ResponseCode.high.byte:
trace "invalid response code", responseCode
return
logScope: responseCode = ResponseCode(responseCode)
trace "got response code"
case ResponseCode(responseCode)
of InvalidRequest, ServerError:
let responseErrMsg = await readChunk(stream, string, false, deadline)
debug "P2P request resulted in error", responseErrMsg
return
of Success:
# The response is OK, the execution continues below
discard
var sizePrefix = await readSizePrefix(stream.transp, deadline)
trace "got msg size prefix", sizePrefix
if sizePrefix == -1:
debug "Failed to read an incoming message size prefix", peer = stream.peer
return
if sizePrefix == 0:
debug "Received SSZ with zero size", peer = stream.peer
return
trace "about to read msg bytes"
var msgBytes = newSeq[byte](sizePrefix)
var readBody = stream.transp.readExactly(addr msgBytes[0], sizePrefix)
await readBody or deadline
if not readBody.finished:
trace "msg bytes not received in time"
return
trace "got message bytes", msgBytes
return msgBytes
except TransportIncompleteError:
return @[]
proc readChunk(stream: P2PStream,
MsgType: type,
withResponseCode: bool,
deadline: Future[void]): Future[Option[MsgType]] {.gcsafe, async.} =
var msgBytes = await stream.readMsgBytes(withResponseCode, deadline)
try:
if msgBytes.len > 0:
return some SSZ.decode(msgBytes, MsgType)
except SerializationError as err:
debug "Failed to decode a network message",
msgBytes, errMsg = err.formatMsg("<msg>")
return
proc readResponse(
stream: P2PStream,
MsgType: type,
deadline: Future[void]): Future[Option[MsgType]] {.gcsafe, async.} =
when MsgType is seq:
type E = ElemType(MsgType)
var results: MsgType
while true:
let nextRes = await readChunk(stream, E, true, deadline)
if nextRes.isNone: break
results.add nextRes.get
if results.len > 0:
return some(results)
else:
return await readChunk(stream, MsgType, true, deadline)
proc encodeErrorMsg(responseCode: ResponseCode, errMsg: string): Bytes =
var s = init OutputStream
s.append byte(responseCode)
s.appendVarint errMsg.len
s.appendValue SSZ, errMsg
s.getOutput
proc sendErrorResponse(peer: Peer,
stream: P2PStream,
err: ref SerializationError,
msgName: string,
msgBytes: Bytes) {.async.} =
debug "Received an invalid request",
peer, msgName, msgBytes, errMsg = err.formatMsg("<msg>")
let responseBytes = encodeErrorMsg(InvalidRequest, err.formatMsg("msg"))
discard await stream.transp.write(responseBytes)
await stream.close()
proc sendErrorResponse(peer: Peer,
stream: P2PStream,
responseCode: ResponseCode,
errMsg: string) {.async.} =
debug "Error processing request", peer, responseCode, errMsg
let responseBytes = encodeErrorMsg(ServerError, errMsg)
discard await stream.transp.write(responseBytes)
await stream.close()
proc sendNotificationMsg(peer: Peer, protocolId: string, requestBytes: Bytes) {.async} =
var deadline = sleepAsync RESP_TIMEOUT
var streamFut = peer.network.daemon.openStream(peer.id, @[protocolId])
await streamFut or deadline
if not streamFut.finished:
# TODO: we are returning here because the deadline passed, but
# the stream can still be opened eventually a bit later. Who is
# going to close it then?
raise newException(TransmissionError, "Failed to open LibP2P stream")
let stream = streamFut.read
defer:
await safeClose(stream)
var s = init OutputStream
s.appendVarint requestBytes.len.uint64
s.append requestBytes
let bytes = s.getOutput
let sent = await stream.transp.write(bytes)
if sent != bytes.len:
raise newException(TransmissionError, "Failed to deliver msg bytes")
# TODO There is too much duplication in the responder functions, but
# I hope to reduce this when I increse the reliance on output streams.
proc sendResponseChunkBytes(responder: UntypedResponder, payload: Bytes) {.async.} =
var s = init OutputStream
s.append byte(Success)
s.appendVarint payload.len.uint64
s.append payload
let bytes = s.getOutput
let sent = await responder.stream.transp.write(bytes)
if sent != bytes.len:
raise newException(TransmissionError, "Failed to deliver all bytes")
proc sendResponseChunkObj(responder: UntypedResponder, val: auto) {.async.} =
var s = init OutputStream
s.append byte(Success)
s.appendValue SSZ, sizePrefixed(val)
let bytes = s.getOutput
let sent = await responder.stream.transp.write(bytes)
if sent != bytes.len:
raise newException(TransmissionError, "Failed to deliver all bytes")
proc sendResponseChunks[T](responder: UntypedResponder, chunks: seq[T]) {.async.} =
var s = init OutputStream
for chunk in chunks:
s.append byte(Success)
s.appendValue SSZ, sizePrefixed(chunk)
let bytes = s.getOutput
let sent = await responder.stream.transp.write(bytes)
if sent != bytes.len:
raise newException(TransmissionError, "Failed to deliver all bytes")
proc makeEth2Request(peer: Peer, protocolId: string, requestBytes: Bytes,
ResponseMsg: type,
timeout: Duration): Future[Option[ResponseMsg]] {.gcsafe, async.} =
var deadline = sleepAsync timeout
# Open a new LibP2P stream
var streamFut = peer.network.daemon.openStream(peer.id, @[protocolId])
await streamFut or deadline
if not streamFut.finished:
# TODO: we are returning here because the deadline passed, but
# the stream can still be opened eventually a bit later. Who is
# going to close it then?
return none(ResponseMsg)
let stream = streamFut.read
defer:
await safeClose(stream)
# Send the request
var s = init OutputStream
s.appendVarint requestBytes.len.uint64
s.append requestBytes
let bytes = s.getOutput
let sent = await stream.transp.write(bytes)
if sent != bytes.len:
await disconnectAndRaise(peer, FaultOrError, "Incomplete send")
# Read the response
return await stream.readResponse(ResponseMsg, deadline)
proc init*(T: type Peer, network: Eth2Node, id: PeerID): Peer =
new result
result.id = id
@ -402,25 +156,6 @@ proc init*(T: type Peer, network: Eth2Node, id: PeerID): Peer =
if proto.peerStateInitializer != nil:
result.protocolStates[i] = proto.peerStateInitializer(result)
proc performProtocolHandshakes*(peer: Peer) {.async.} =
var subProtocolsHandshakes = newSeqOfCap[Future[void]](allProtocols.len)
for protocol in allProtocols:
if protocol.handshake != nil:
subProtocolsHandshakes.add((protocol.handshake)(peer, nil))
await all(subProtocolsHandshakes)
template initializeConnection*(peer: Peer): auto =
performProtocolHandshakes(peer)
proc initProtocol(name: string,
peerInit: PeerStateInitializer,
networkInit: NetworkStateInitializer): ProtocolInfoObj =
result.name = name
result.messages = @[]
result.peerStateInitializer = peerInit
result.networkStateInitializer = networkInit
proc registerMsg(protocol: ProtocolInfo,
name: string,
thunk: ThunkProc,
@ -431,53 +166,6 @@ proc registerMsg(protocol: ProtocolInfo,
libp2pProtocol: libp2pProtocol,
printer: printer)
proc init*[MsgType](T: type Responder[MsgType],
peer: Peer, stream: P2PStream): T =
T(UntypedResponder(peer: peer, stream: stream))
import
typetraits
template write*[M](r: var Responder[M], val: auto): auto =
mixin send
type Msg = M
type MsgRec = RecType(Msg)
when MsgRec is seq|openarray:
type E = ElemType(MsgRec)
when val is E:
sendResponseChunkObj(UntypedResponder(r), val)
elif val is MsgRec:
sendResponseChunks(UntypedResponder(r), val)
else:
static: echo "BAD TYPE ", name(E), " vs ", name(type(val))
{.fatal: "bad".}
else:
send(r, val)
proc implementSendProcBody(sendProc: SendProc) =
let
msg = sendProc.msg
UntypedResponder = bindSym "UntypedResponder"
await = ident "await"
proc sendCallGenerator(peer, bytes: NimNode): NimNode =
if msg.kind != msgResponse:
let msgProto = getRequestProtoName(msg.procDef)
case msg.kind
of msgRequest:
let
timeout = msg.timeoutParam[0]
ResponseRecord = msg.response.recName
quote:
makeEth2Request(`peer`, `msgProto`, `bytes`,
`ResponseRecord`, `timeout`)
else:
quote: sendNotificationMsg(`peer`, `msgProto`, `bytes`)
else:
quote: sendResponseChunkBytes(`UntypedResponder`(`peer`), `bytes`)
sendProc.useStandardBody(nil, nil, sendCallGenerator)
proc p2pProtocolBackendImpl*(p: P2PProtocol): Backend =
var
Format = ident "SSZ"
@ -497,6 +185,7 @@ proc p2pProtocolBackendImpl*(p: P2PProtocol): Backend =
msgBytesVar = ident "msgBytes"
daemonVar = ident "daemon"
await = ident "await"
callUserHandler = ident "callUserHandler"
p.useRequestIds = false
p.useSingleRecordInlining = true
@ -518,7 +207,7 @@ proc p2pProtocolBackendImpl*(p: P2PProtocol): Backend =
protocol = msg.protocol
msgName = $msg.ident
msgNameLit = newLit msgName
msgRecName = msg.recName
MsgRecName = msg.recName
if msg.procDef.body.kind != nnkEmpty and msg.kind == msgRequest:
# Request procs need an extra param - the stream where the response
@ -529,64 +218,23 @@ proc p2pProtocolBackendImpl*(p: P2PProtocol): Backend =
##
## Implemenmt Thunk
##
var thunkName = ident(msgName & "_thunk")
let awaitUserHandler = msg.genAwaitUserHandler(msgVar, [peerVar, streamVar])
var thunkName: NimNode
let tracing = when tracingEnabled:
quote: logReceivedMsg(`streamVar`.peer, `msgVar`.get)
if msg.userHandler != nil:
thunkName = ident(msgName & "_thunk")
let userHandlerCall = msg.genUserHandlerCall(msgVar, [peerVar, streamVar])
msg.defineThunk quote do:
template `callUserHandler`(`peerVar`: `Peer`,
`streamVar`: `P2PStream`,
`msgVar`: `MsgRecName`): untyped =
`userHandlerCall`
proc `thunkName`(`daemonVar`: `DaemonAPI`,
`streamVar`: `P2PStream`): Future[void] {.gcsafe.} =
return handleIncomingStream(`Eth2Node`(`daemonVar`.userData), `streamVar`,
`MsgRecName`, `Format`)
else:
newStmtList()
msg.defineThunk quote do:
proc `thunkName`(`daemonVar`: `DaemonAPI`,
`streamVar`: `P2PStream`) {.async, gcsafe.} =
## Uncomment this to enable tracing on all incoming requests
## You can include `msgNameLit` in the condition to select
## more specific requests:
# when chronicles.runtimeFilteringEnabled:
# setLogLevel(LogLevel.TRACE)
# defer: setLogLevel(LogLevel.DEBUG)
# trace "incoming " & `msgNameLit` & " stream"
defer:
`await` safeClose(`streamVar`)
let
`deadlineVar` = sleepAsync RESP_TIMEOUT
`msgBytesVar` = `await` readMsgBytes(`streamVar`, false, `deadlineVar`)
`peerVar` = peerFromStream(`daemonVar`, `streamVar`)
if `msgBytesVar`.len == 0:
`await` sendErrorResponse(`peerVar`, `streamVar`,
ServerError, readTimeoutErrorMsg)
return
var `msgVar`: `msgRecName`
try:
trace "about to decode incoming msg"
`msgVar` = decode(`Format`, `msgBytesVar`, `msgRecName`)
except SerializationError as `errVar`:
`await` sendErrorResponse(`peerVar`, `streamVar`, `errVar`,
`msgNameLit`, `msgBytesVar`)
return
except Exception as err:
# TODO. This is temporary code that should be removed after interop.
# It can be enabled only in certain diagnostic builds where it should
# re-raise the exception.
debug "Crash during serialization", inputBytes = toHex(`msgBytesVar`),
msgName = `msgNameLit`,
deserializedType = astToStr(`msgRecName`)
`await` sendErrorResponse(`peerVar`, `streamVar`, ServerError, err.msg)
try:
`tracing`
trace "about to execute user handler"
`awaitUserHandler`
except CatchableError as `errVar`:
try:
`await` sendErrorResponse(`peerVar`, `streamVar`, ServerError, `errVar`.msg)
except CatchableError:
debug "Failed to deliver error response", peer = `peerVar`
thunkName = newNilLit()
##
## Implement Senders and Handshake
@ -603,7 +251,7 @@ proc p2pProtocolBackendImpl*(p: P2PProtocol): Backend =
msgNameLit,
thunkName,
getRequestProtoName(msg.procDef),
newTree(nnkBracketExpr, messagePrinter, msgRecName)))
newTree(nnkBracketExpr, messagePrinter, MsgRecName)))
result.implementProtocolInit = proc (p: P2PProtocol): NimNode =
return newCall(initProtocol, newLit(p.name), p.peerInit, p.netInit)