Implement all libp2p_native response codes as specified in the latest proposal
This commit is contained in:
@ -68,6 +68,12 @@ type
CompressedMsgId = tuple
protocolIdx, methodId: int
ResponseCode* = enum
PeerStateInitializer* = proc(peer: Peer): RootRef {.gcsafe.}
NetworkStateInitializer* = proc(network: EthereumNode): RootRef {.gcsafe.}
HandshakeStep* = proc(peer: Peer, stream: P2PStream): Future[void] {.gcsafe.}
@ -81,6 +87,8 @@ type
PeerDisconnected* = object of CatchableError
reason*: DisconnectionReason
TransmissionError* = object of CatchableError
defaultIncomingReqTimeout = 5000
defaultOutgoingReqTimeout = 10000
@ -88,9 +96,9 @@ const
IrrelevantNetwork* = UselessPeer
include libp2p_backends_common
include eth/p2p/p2p_backends_helpers
include eth/p2p/p2p_tracing
include libp2p_backends_common
proc init*(T: type Eth2Node, daemon: DaemonAPI): Future[T] {.async.} =
new result
@ -107,40 +115,111 @@ proc init*(T: type Eth2Node, daemon: DaemonAPI): Future[T] {.async.} =
if msg.libp2pProtocol.len > 0:
await daemon.addHandler(@[msg.libp2pProtocol], msg.thunk)
proc readMsg(stream: P2PStream, MsgType: type,
deadline: Future[void]): Future[Option[MsgType]] {.async.} =
proc readMsg(stream: P2PStream,
MsgType: type,
withResponseCode: bool,
deadline: Future[void]): Future[Option[MsgType]] {.gcsafe.}
proc readMsgBytes(stream: P2PStream,
withResponseCode: bool,
deadline: Future[void]): Future[Bytes] {.async.} =
if withResponseCode:
var responseCode: byte
var readResponseCode = stream.transp.readExactly(addr responseCode, 1)
await readResponseCode or deadline
if not readResponseCode.finished: return
if responseCode > ResponseCode.high.byte: return
logScope: responseCode = ResponseCode(responseCode)
case ResponseCode(responseCode)
of InvalidRequest:
debug "P2P request was classified as invalid"
of EncodingError, ServerError:
let responseErrMsg = await readMsg(stream, string, false, deadline)
debug "P2P request resulted in error", responseErrMsg
of Success:
# The response is OK, the execution continues below
var sizePrefix: uint32
var readSizePrefix = stream.transp.readExactly(addr sizePrefix, sizeof(sizePrefix))
await readSizePrefix or deadline
if not readSizePrefix.finished: return
if sizePrefix == 0:
debug "Received SSZ with zero size", peer = stream.peer
var msgBytes = newSeq[byte]( + sizeof(sizePrefix))
copyMem(addr msgBytes[0], addr sizePrefix, sizeof(sizePrefix))
var readBody = stream.transp.readExactly(addr msgBytes[sizeof(sizePrefix)],
await readBody or deadline
if not readBody.finished: return
let decoded = SSZ.decode(msgBytes, MsgType)
return msgBytes
proc readMsgBytesOrClose(stream: P2PStream,
withResponseCode: bool,
deadline: Future[void]): Future[Bytes] {.async.} =
result = await stream.readMsgBytes(withResponseCode, deadline)
if result.len == 0: await stream.close()
proc readMsg(stream: P2PStream,
MsgType: type,
withResponseCode: bool,
deadline: Future[void]): Future[Option[MsgType]] {.gcsafe, async.} =
var msgBytes = await stream.readMsgBytesOrClose(withResponseCode, deadline)
return some(decoded)
except SerializationError:
if msgBytes.len > 0: return some SSZ.decode(msgBytes, MsgType)
except SerializationError as err:
debug "Failed to decode a network message",
msgBytes, errMsg = err.formatMsg("<msg>")
proc sendErrorResponse(peer: Peer,
stream: P2PStream,
err: ref SerializationError,
msgName: string,
msgBytes: Bytes) {.async.} =
debug "Received an invalid request",
peer, msgName, msgBytes, errMsg = err.formatMsg("<msg>")
var responseCode = byte(EncodingError)
discard await stream.transp.write(addr responseCode, 1)
await stream.close()
proc sendErrorResponse(peer: Peer,
stream: P2PStream,
responseCode: ResponseCode,
errMsg: string) {.async.} =
debug "Error processing request",
peer, responseCode, errMsg
var outputStream = init OutputStream
outputStream.append byte(responseCode)
outputStream.appendValue SSZ, errMsg
discard await stream.transp.write(outputStream.getOutput)
await stream.close()
proc sendMsg(peer: Peer, protocolId: string, requestBytes: Bytes) {.async} =
var stream = await, @[protocolId])
# TODO how does openStream fail? Set a timeout here and handle it
let sent = await stream.transp.write(requestBytes)
# TODO: Should I check that `sent` is equal to the desired number of bytes
if sent != requestBytes.len:
raise newException(TransmissionError, "Failed to deliver all bytes")
proc sendBytes(stream: P2PStream, bytes: Bytes) {.async.} =
let sent = await stream.transp.write(bytes)
# TODO: Should I check that `sent` is equal to the desired number of bytes
if sent != bytes.len:
raise newException(TransmissionError, "Failed to deliver all bytes")
proc makeEth2Request(peer: Peer, protocolId: string, requestBytes: Bytes,
ResponseMsg: type,
timeout = 10.seconds): Future[Option[ResponseMsg]] {.async.} =
timeout: Duration): Future[Option[ResponseMsg]] {.gcsafe, async.} =
var deadline = sleepAsync timeout
# Open a new LibP2P stream
var streamFut =, @[protocolId])
await streamFut or deadline
@ -154,59 +233,52 @@ proc makeEth2Request(peer: Peer, protocolId: string, requestBytes: Bytes,
await disconnectAndRaise(peer, FaultOrError, "Incomplete send")
# Read the response
return await stream.readMsg(ResponseMsg, deadline)
return await stream.readMsg(ResponseMsg, true, deadline)
proc exchangeHandshake(peer: Peer, protocolId: string, requestBytes: Bytes,
ResponseMsg: type,
timeout: Duration): Future[ResponseMsg] {.gcsafe, async.} =
var response = await makeEth2Request(peer, protocolId, requestBytes,
ResponseMsg, timeout)
if not response.isSome:
await peer.disconnectAndRaise(BreachOfProtocol, "Failed to complete a handshake")
return response.get
proc p2pStreamName(MsgType: type): string =
mixin msgProtocol, protocolInfo, msgId
template handshakeImpl(HandshakeTypeExpr: untyped,
template handshakeImpl(outputStreamVar, handshakeSerializationCall: untyped,
lowLevelThunk: untyped,
HandshakeType: untyped,
# TODO: we cannot use a type parameter above
# because of the following Nim issue:
peerExpr: Peer,
streamExpr: P2PStream,
lazySendCall: Future[void],
timeoutExpr: Duration): auto =
# We make sure the inputs are evaluated only once.
stream = streamExpr
peer = peerExpr
timeout = timeoutExpr
# TODO: This is a work-around for a Nim issue. Please note that it's
# semantically wrong, so if you get a compilation failure, try to
# remove it (perhaps Nim got fixed)
type HandshakeType = type(HandshakeTypeExpr)
proc asyncStep(stream: P2PStream): Future[HandshakeType] {.async.} =
let deadline = sleepAsync timeout
var stream = stream
peer: Peer,
stream: P2PStream,
timeout: Duration): auto =
if stream == nil:
try: stream = await openStream(,,
# TODO openStream should accept Duration
int milliseconds(timeout))
except CatchableError:
const errMsg = "Failed to open LIBP2P stream"
debug errMsg,
stream = p2pStreamName(HandshakeType),
err = getCurrentExceptionMsg()
await disconnectAndRaise(peer, FaultOrError, errMsg)
# Please pay attention that `lazySendCall` is evaluated lazily here.
# For this reason `handshakeImpl` must remain a template.
await lazySendCall
let response = await readMsg(stream, HandshakeType, deadline)
if response.isSome:
return response.get
var outputStreamVar = init OutputStream
exchangeHandshake(peer, p2pStreamName(HandshakeType),
getOutput(outputStreamVar), HandshakeType, timeout)
await disconnectAndRaise(peer, BreachOfProtocol, "Handshake not completed in time")
except CatchableError:
await reraiseAsPeerDisconnected(peer, "Failure during handshake")
proc asyncStep: Future[HandshakeType] {.async.} =
let deadline = sleepAsync timeout
var responseFut = nextMsg(peer, HandshakeType)
await lowLevelThunk(, stream) or deadline
if not responseFut.finished:
await disconnectAndRaise(peer, BreachOfProtocol, "Failed to complete a handshake")
var outputStreamVar = init OutputStream
append(outputStreamVar, byte(Success))
await sendBytes(stream, getOutput(outputStreamVar))
proc resolveNextMsgFutures(peer: Peer, msg: auto) =
type MsgType = type(msg)
@ -239,16 +311,6 @@ proc performProtocolHandshakes*(peer: Peer) {.async.} =
template initializeConnection*(peer: Peer): auto =
template getRecipient(peer: Peer): Peer =
# TODO: this should be removed eventually
template getRecipient(stream: P2PStream): P2PStream =
template getRecipient(response: Responder): Peer =
proc initProtocol(name: string,
peerInit: PeerStateInitializer,
networkInit: NetworkStateInitializer): ProtocolInfoObj =
@ -257,19 +319,6 @@ proc initProtocol(name: string,
result.peerStateInitializer = peerInit
result.networkStateInitializer = networkInit
proc registerProtocol(protocol: ProtocolInfo) =
# TODO: This can be done at compile-time in the future
let pos = lowerBound(gProtocols, protocol)
gProtocols.insert(protocol, pos)
for i in 0 ..< gProtocols.len:
gProtocols[i].index = i
proc setEventHandlers(p: ProtocolInfo,
handshake: HandshakeStep,
disconnectHandler: DisconnectionHandler) =
p.handshake = handshake
p.disconnectHandler = disconnectHandler
proc registerMsg(protocol: ProtocolInfo,
name: string,
thunk: ThunkProc,
@ -281,18 +330,7 @@ proc registerMsg(protocol: ProtocolInfo,
printer: printer)
proc getRequestProtoName(fn: NimNode): NimNode =
when true:
return newLit("rpc/" & $
# `getCustomPragmaVal` doesn't work yet on regular nnkProcDef nodes
# (TODO: file as an issue)
let pragmas = fn.pragma
if pragmas.kind == nnkPragma and pragmas.len > 0:
for pragma in pragmas:
if pragma.len > 0 and $pragma[0] == "libp2pProtocol":
return pragma[1]
error "All stream opening procs must have the 'libp2pProtocol' pragma specified.", fn
return newLit("/ETH/BeaconChain/" & $ & "/1/SSZ")
proc init*[MsgType](T: type Responder[MsgType],
peer: Peer, stream: P2PStream): T =
@ -301,30 +339,41 @@ proc init*[MsgType](T: type Responder[MsgType],
proc implementSendProcBody(sendProc: SendProc) =
msg = sendProc.msg
peer = sendProc.peerParam
timeout = sendProc.timeoutParam
ResponseRecord = if msg.response != nil: msg.response.recIdent else: nil
UntypedResponder = bindSym "UntypedResponder"
sendMsg = bindSym "sendMsg"
sendBytes = bindSym "sendBytes"
makeEth2Request = bindSym "makeEth2Request"
await = ident "await"
proc sendCallGenerator(peer, bytes: NimNode): NimNode =
if msg.kind != msgResponse:
let msgProto = getRequestProtoName(msg.procDef)
case msg.kind
of msgRequest:
let timeout = msg.timeoutParam[0]
quote: `makeEth2Request`(`peer`, `msgProto`, `bytes`,
timeout = msg.timeoutParam[0]
ResponseRecord = msg.response.recIdent
makeEth2Request(`peer`, `msgProto`, `bytes`,
`ResponseRecord`, `timeout`)
of msgHandshake:
quote: `sendBytes`(`peer`, `bytes`)
timeout = msg.timeoutParam[0]
HandshakeRecord = msg.recIdent
exchangeHandshake(`peer`, `msgProto`, `bytes`,
`HandshakeRecord`, `timeout`)
quote: `sendMsg`(`peer`, `msgProto`, `bytes`)
quote: sendMsg(`peer`, `msgProto`, `bytes`)
quote: `sendBytes`(`UntypedResponder`(`peer`).stream, `bytes`)
quote: sendBytes(`UntypedResponder`(`peer`).stream, `bytes`)
sendProc.useStandardBody(nil, nil, sendCallGenerator)
proc prependResponseCode(stream: NimNode): NimNode =
quote: append(`stream`, byte(Success))
let preSerializationStep = if msg.kind == msgResponse:
sendProc.useStandardBody(preSerializationStep, nil, sendCallGenerator)
proc p2pProtocolBackendImpl*(p: P2PProtocol): Backend =
@ -332,20 +381,18 @@ proc p2pProtocolBackendImpl*(p: P2PProtocol): Backend =
Responder = bindSym "Responder"
DaemonAPI = bindSym "DaemonAPI"
P2PStream = ident "P2PStream"
OutputStream = bindSym "OutputStream"
Peer = bindSym "Peer"
Eth2Node = bindSym "Eth2Node"
messagePrinter = bindSym "messagePrinter"
peerFromStream = bindSym "peerFromStream"
handshakeImpl = bindSym "handshakeImpl"
resolveNextMsgFutures = bindSym "resolveNextMsgFutures"
milliseconds = bindSym "milliseconds"
registerMsg = bindSym "registerMsg"
initProtocol = bindSym "initProtocol"
bindSymOp = bindSym "bindSym"
receivedMsg = ident "msg"
errVar = ident "err"
msgVar = ident "msg"
msgBytesVar = ident "msgBytes"
daemonVar = ident "daemon"
streamVar = ident "stream"
deadlineVar = ident "deadline"
await = ident "await"
p.useRequestIds = false
@ -366,6 +413,7 @@ proc p2pProtocolBackendImpl*(p: P2PProtocol): Backend =
protocol = msg.protocol
msgName = $msg.ident
msgNameLit = newLit msgName
msgRecName = msg.recIdent
if msg.procDef.body.kind != nnkEmpty and msg.kind == msgRequest:
@ -377,27 +425,65 @@ proc p2pProtocolBackendImpl*(p: P2PProtocol): Backend =
## Implemenmt Thunk
let awaitUserHandler = msg.genAwaitUserHandler(
newCall("get", receivedMsg), [peerVar, streamVar])
var thunkName = ident(msgName & "_thunk")
requestDataTimeout = newCall(milliseconds, newLit(defaultIncomingReqTimeout))
awaitUserHandler = msg.genAwaitUserHandler(msgVar, [peerVar, streamVar])
let tracing = when tracingEnabled:
quote do: logReceivedMsg(`streamVar`.peer, `receivedMsg`.get)
quote: logReceivedMsg(`streamVar`.peer, `msgVar`.get)
msg.defineThunk quote do:
proc `thunkName`(`daemonVar`: `DaemonAPI`,
`streamVar`: `P2PStream`) {.async, gcsafe.} =
requestDataTimeout = newCall(milliseconds, newLit(defaultIncomingReqTimeout))
thunkName = ident(msgName & "_thunk")
`deadlineVar` = sleepAsync `requestDataTimeout`
`msgBytesVar` = `await` readMsgBytes(`streamVar`, false, `deadlineVar`)
`peerVar` = peerFromStream(`daemonVar`, `streamVar`)
if `msgBytesVar`.len == 0:
`await` sendErrorResponse(`peerVar`, `streamVar`, ServerError,
"Exceeded read timeout for a request")
var `msgVar`: `msgRecName`
`msgVar` = decode(`Format`, `msgBytesVar`, `msgRecName`)
except SerializationError as `errVar`:
`await` sendErrorResponse(`peerVar`, `streamVar`, `errVar`,
`msgNameLit`, `msgBytesVar`)
resolveNextMsgFutures(`peerVar`, `msgVar`)
except CatchableError as `errVar`:
`await` sendErrorResponse(`peerVar`, `streamVar`, ServerError, `errVar`.msg)
## Implement Senders and Handshake
if msg.kind == msgHandshake:
# In LibP2P protocols, the handshake thunk is special. Instead of directly
# deserializing the incoming message and calling the user-supplied handler,
# we execute the `onPeerConnected` handler instead.
# The `onPeerConnected` handler is executed symmetrically for both peers
# and it's expected that one of its very first steps would be to send the
# handshake and then await the same from the other side. We call this step
# "handshakeExchanger".
# For the initiating peer, the handshakeExchanger opens a stream and sends
# a regular request through it, but on the receiving side, it just setups
# a future and call the lower-level thunk that will complete it.
handshake = msg.protocol.onPeerConnected
lowLevelThunkName = $thunkName
msg.defineThunk if msg.kind == msgHandshake:
# In LibP2P protocols, the `onPeerConnected` handler is executed when the
# other peer opens a stream. Contrary to other thunk procs, the message is
# not immediately deserialized. Instead, the handshake "sender proc" acts
# as an exchanger that sends our handshake message while deserializing the
# contents of the other peer's handshake.
# Thus, the very first communication act of the `onPeerConnected` handler
# must be the execution of the handshake exchanger.
let handshake = msg.protocol.onPeerConnected
if handshake.isNil:
macros.error "A LibP2P protocol with a handshake must also include an " &
"`onPeerConnected` handler.", msg.procDef
@ -409,61 +495,64 @@ proc p2pProtocolBackendImpl*(p: P2PProtocol): Backend =
proc `handshakeProcName`(`peerVar`: `Peer`,
`streamVar`: `P2PStream`) {.async, gcsafe.}
proc `thunkName`(`daemonVar`: `DaemonAPI`,
`streamVar`: `P2PStream`): Future[void] {.gcsafe.} =
let `peerVar` = `peerFromStream`(`daemonVar`, `streamVar`)
return `handshakeProcName`(`peerVar`, `streamVar`)
# Here we replace the 'thunkProc' that will be registered as a handler
# for incoming messages:
thunkName = ident(msgName & "_handleConnection")
msg.protocol.outRecvProcs.add quote do:
proc `thunkName`(`daemonVar`: `DaemonAPI`,
`streamVar`: `P2PStream`) {.async, gcsafe.} =
var `deadlineVar` = sleepAsync `requestDataTimeout`
var `receivedMsg` = `await` readMsg(`streamVar`,
if `receivedMsg`.isNone:
# TODO: This peer is misbehaving, perhaps we should penalize him somehow
let `peerVar` = `peerFromStream`(`daemonVar`, `streamVar`)
`resolveNextMsgFutures`(`peerVar`, get(`receivedMsg`))
let `peerVar` = peerFromStream(`daemonVar`, `streamVar`)
`await` `handshakeProcName`(`peerVar`, `streamVar`)
except SerializationError as err:
debug "Failed to decode message",
err = err.formatMsg("<msg>"),
msg = `msgNameLit`,
peer = $(`streamVar`.peer)
`await` disconnect(`peerVar`, FaultOrError)
except CatchableError as err:
debug "Failed to complete handshake", err = err.msg
`await` disconnect(`peerVar`, FaultOrError)
## Implement Senders and Handshake
var sendProc = msg.createSendProc(isRawSender = (msg.kind == msgHandshake))
implementSendProcBody sendProc
if msg.kind == msgHandshake:
rawSendProc = newLit($
handshakeSerializer = msg.createSerializer()
handshakeSerializerName = newLit($
handshakeExchanger = msg.createSendProc(nnkMacroDef)
paramsArray = newTree(nnkBracket).appendAllParams(handshakeExchanger.def)
bindSym = ident "bindSym"
handshakeTypeName = newLit($msg.recIdent)
getAst = ident "getAst"
res = ident "result"
handshakeExchanger.setBody quote do:
stream = ident "stream"
rawSendProc = `bindSymOp` `rawSendProc`
outputStreamVar = ident "outputStream"
lowLevelThunk = ident `lowLevelThunkName`
HandshakeType = ident `handshakeTypeName`
params = `paramsArray`
lazySendCall = newCall(rawSendProc, params)
peer = params[0]
timeout = params[^1]
handshakeSerializationCall = newCall(`bindSymOp` `handshakeSerializerName`, params)
lazySendCall[1] = stream
lazySendCall.del(lazySendCall.len - 1)
handshakeSerializationCall[1] = outputStreamVar
handshakeSerializationCall.del(handshakeSerializationCall.len - 1)
return `getAst`(`handshakeImpl`(`msgRecName`, peer, stream, lazySendCall, timeout))
`res` = `getAst`(handshakeImpl(outputStreamVar, handshakeSerializationCall,
lowLevelThunk, HandshakeType,
peer, stream, timeout))
sendProc.def.params[1][1] = P2PStream
when defined(debugMacros) or defined(debugHandshake):
echo "---- Handshake implementation ----"
echo repr(`res`)
var sendProc = msg.createSendProc()
implementSendProcBody sendProc
newTree(nnkBracketExpr, messagePrinter, msgRecName)))
@ -59,3 +59,16 @@ proc nextMsg*(peer: Peer, MsgType: type): Future[MsgType] =
initFuture result
peer.awaitedMessages[awaitedMsgId] = result
proc registerProtocol(protocol: ProtocolInfo) =
# TODO: This can be done at compile-time in the future
let pos = lowerBound(gProtocols, protocol)
gProtocols.insert(protocol, pos)
for i in 0 ..< gProtocols.len:
gProtocols[i].index = i
proc setEventHandlers(p: ProtocolInfo,
handshake: HandshakeStep,
disconnectHandler: DisconnectionHandler) =
p.handshake = handshake
p.disconnectHandler = disconnectHandler
@ -168,9 +168,9 @@ proc getThunk(protocol: ProtocolInfo, methodId: uint16): ThunkProc =
if >= protocol.messages.len: return nil
include libp2p_backends_common
include eth/p2p/p2p_backends_helpers
include eth/p2p/p2p_tracing
include libp2p_backends_common
proc handleConnectingBeaconChainPeer(daemon: DaemonAPI, stream: P2PStream) {.async, gcsafe.}
@ -390,19 +390,6 @@ proc initProtocol(name: string, version: int,
result.peerStateInitializer = peerInit
result.networkStateInitializer = networkInit
proc registerProtocol(protocol: ProtocolInfo) =
# TODO: This can be done at compile-time in the future
let pos = lowerBound(gProtocols, protocol)
gProtocols.insert(protocol, pos)
for i in 0 ..< gProtocols.len:
gProtocols[i].index = i
proc setEventHandlers(p: ProtocolInfo,
handshake: HandshakeStep,
disconnectHandler: DisconnectionHandler) =
p.handshake = handshake
p.disconnectHandler = disconnectHandler
proc registerMsg(protocol: ProtocolInfo,
id: int, name: string,
thunk: ThunkProc,
@ -138,7 +138,7 @@ p2pProtocol BeaconSync(version = 1,
bestRoot: Eth2Digest,
bestSlot: Slot)
proc sendGoodbye(peer: Peer, reason: DisconnectionReason)
proc goodbye(peer: Peer, reason: DisconnectionReason)
proc getStatus(
@ -5,7 +5,7 @@ type
network_type {.strdefine.} = "libp2p_spec"
network_type {.strdefine.} = "libp2p_native"
networkBackend* = when network_type == "rlpx": rlpxBackend
elif network_type == "libp2p_spec": libp2pSpecBackend
Reference in New Issue