Make the APIs compatible with libp2p

Lib2P2 handles RPC requests and responses with separate streams
while DEV2P2 is relying on tagged messages transmitted over a
single stream. To cover both models through the same application
code, we introduce a new `response` variable in the request handlers.
The user is supposed to issue a call to `response.send` in order to
reply to the request. Please note that the `response.send` signature
is strongly typed and depends on the current message.
This commit is contained in:
Zahary Karadjov 2019-03-11 11:22:06 +02:00 committed by zah
parent eb1a04e93b
commit 3efec171a6
8 changed files with 175 additions and 134 deletions

View File

@ -13,7 +13,8 @@ requires "nim >= 0.19.0",
"rocksdb",
"package_visible_types",
"chronos",
"chronicles"
"chronicles",
"std_shims"
import strutils
import oswalkdir, ospaths # In newer nim these are merged to os

View File

@ -0,0 +1,66 @@
proc getState(peer: Peer, proto: ProtocolInfo): RootRef =
peer.protocolStates[proto.index]
template state*(peer: Peer, Protocol: type): untyped =
## Returns the state object of a particular protocol for a
## particular connection.
mixin State
bind getState
cast[Protocol.State](getState(peer, Protocol.protocolInfo))
proc getNetworkState(node: EthereumNode, proto: ProtocolInfo): RootRef =
node.protocolStates[proto.index]
template protocolState*(node: EthereumNode, Protocol: type): untyped =
mixin NetworkState
bind getNetworkState
cast[Protocol.NetworkState](getNetworkState(node, Protocol.protocolInfo))
template networkState*(connection: Peer, Protocol: type): untyped =
## Returns the network state object of a particular protocol for a
## particular connection.
protocolState(connection.network, Protocol)
proc initProtocolState*[T](state: T, x: Peer|EthereumNode) {.gcsafe.} = discard
proc createPeerState[ProtocolState](peer: Peer): RootRef =
var res = new ProtocolState
mixin initProtocolState
initProtocolState(res, peer)
return cast[RootRef](res)
proc createNetworkState[NetworkState](network: EthereumNode): RootRef {.gcsafe.} =
var res = new NetworkState
mixin initProtocolState
initProtocolState(res, network)
return cast[RootRef](res)
proc chooseFieldType(n: NimNode): NimNode =
## Examines the parameter types used in the message signature
## and selects the corresponding field type for use in the
## message object type (i.e. `p2p.hello`).
##
## For now, only openarray types are remapped to sequences.
result = n
if n.kind == nnkBracketExpr and eqIdent(n[0], "openarray"):
result = n.copyNimTree
result[0] = ident("seq")
proc popTimeoutParam(n: NimNode): NimNode =
var lastParam = n.params[^1]
if eqIdent(lastParam[0], "timeout"):
if lastParam[2].kind == nnkEmpty:
macros.error "You must specify a default value for the `timeout` parameter", lastParam
result = lastParam
n.params.del(n.params.len - 1)
proc verifyStateType(t: NimNode): NimNode =
result = t[1]
if result.kind == nnkSym and $result == "nil":
return nil
if result.kind != nnkBracketExpr or $result[0] != "ref":
macros.error($result & " must be a ref type")
proc newFuture[T](location: var Future[T]) =
location = newFuture[T]()

View File

@ -1,11 +1,8 @@
import
private/p2p_types
const tracingEnabled* = defined(p2pdump)
const tracingEnabled = defined(p2pdump)
when tracingEnabled:
import
macros,
macros, typetraits,
serialization, json_serialization/writer,
chronicles, chronicles_tail/configuration
@ -25,7 +22,7 @@ when tracingEnabled:
template logRecord(eventName: static[string], args: varargs[untyped]) =
p2pMessages.log LogLevel.NONE, eventName, topics = "p2pdump", args
proc initTracing*(baseProtocol: ProtocolInfo,
proc initTracing(baseProtocol: ProtocolInfo,
userProtocols: seq[ProtocolInfo]) =
once:
var w = init StringJsonWriter
@ -48,25 +45,26 @@ when tracingEnabled:
proc logMsgEventImpl(eventName: static[string],
peer: Peer,
protocol: ProtocolInfo,
msgId: int,
msgName: string,
json: string) =
# this is kept as a separate proc to reduce the code bloat
logRecord eventName, port = int(peer.network.address.tcpPort),
peer = $peer.remote,
protocol = protocol.name,
msgId, data = JsonString(json)
msg = msgName,
data = JsonString(json)
proc logMsgEvent[Msg](eventName: static[string], peer: Peer, msg: Msg) =
mixin msgProtocol, protocolInfo, msgId
logMsgEventImpl(eventName, peer,
Msg.msgProtocol.protocolInfo,
Msg.msgId,
Msg.type.name,
StringJsonWriter.encode(msg))
proc logSentMsgFields*(peer: NimNode,
proc logSentMsgFields(peer: NimNode,
protocolInfo: NimNode,
msgId: int,
msgName: string,
fields: openarray[NimNode]): NimNode =
## This generates the tracing code inserted in the message sending procs
## `fields` contains all the params that were serialized in the message
@ -82,35 +80,35 @@ when tracingEnabled:
result.add quote do:
endRecord(`tracer`)
logMsgEventImpl("outgoing_msg", `peer`,
`protocolInfo`, `msgId`, getOutput(`tracer`))
`protocolInfo`, `msgName`, getOutput(`tracer`))
template logSentMsg*(peer: Peer, msg: auto) =
template logSentMsg(peer: Peer, msg: auto) =
logMsgEvent("outgoing_msg", peer, msg)
template logReceivedMsg*(peer: Peer, msg: auto) =
template logReceivedMsg(peer: Peer, msg: auto) =
logMsgEvent("incoming_msg", peer, msg)
template logConnectedPeer*(p: Peer) =
template logConnectedPeer(p: Peer) =
logRecord "peer_connected",
port = int(p.network.address.tcpPort),
peer = $p.remote
template logAcceptedPeer*(p: Peer) =
template logAcceptedPeer(p: Peer) =
logRecord "peer_accepted",
port = int(p.network.address.tcpPort),
peer = $p.remote
template logDisconnectedPeer*(p: Peer) =
template logDisconnectedPeer(p: Peer) =
logRecord "peer_disconnected",
port = int(p.network.address.tcpPort),
peer = $p.remote
else:
template initTracing*(baseProtocol: ProtocolInfo,
template initTracing(baseProtocol: ProtocolInfo,
userProtocols: seq[ProtocolInfo])= discard
template logSentMsg*(peer: Peer, msg: auto) = discard
template logReceivedMsg*(peer: Peer, msg: auto) = discard
template logConnectedPeer*(peer: Peer) = discard
template logAcceptedPeer*(peer: Peer) = discard
template logDisconnectedPeer*(peer: Peer) = discard
template logSentMsg(peer: Peer, msg: auto) = discard
template logReceivedMsg(peer: Peer, msg: auto) = discard
template logConnectedPeer(peer: Peer) = discard
template logAcceptedPeer(peer: Peer) = discard
template logDisconnectedPeer(peer: Peer) = discard

View File

@ -170,3 +170,9 @@ type
MessageTimeout,
SubprotocolReason = 0x10
ResponseWithId*[MsgType] = object
peer*: Peer
id*: int
Response*[MsgType] = distinct Peer

View File

@ -1,15 +1,12 @@
import
macros, tables, algorithm, deques, hashes, options, typetraits,
chronicles, nimcrypto, chronos, eth/[rlp, common, keys],
private/p2p_types, kademlia, auth, rlpxcrypt, enode, p2p_tracing
std_shims/macros_shim, chronicles, nimcrypto, chronos, eth/[rlp, common, keys],
private/p2p_types, kademlia, auth, rlpxcrypt, enode
when useSnappy:
import snappy
const devp2pSnappyVersion* = 5
const
tracingEnabled = defined(p2pdump)
logScope:
topics = "rlpx"
@ -18,6 +15,8 @@ const
defaultReqTimeout = 10000
maxMsgSize = 1024 * 1024
include p2p_tracing
when tracingEnabled:
import
eth/common/eth_types_json_serialization
@ -36,9 +35,6 @@ var
template allProtocols*: auto = {.gcsafe.}: gProtocols
template devp2pInfo: auto = {.gcsafe.}: gDevp2pInfo
proc newFuture[T](location: var Future[T]) =
location = newFuture[T]()
proc `$`*(p: Peer): string {.inline.} =
$p.remote
@ -240,6 +236,10 @@ proc perPeerMsgIdImpl(peer: Peer, proto: ProtocolInfo, msgId: int): int {.inline
if not peer.dispatcher.isNil:
result += peer.dispatcher.protocolOffsets[proto.index]
template getPeer(peer: Peer): auto = peer
template getPeer(response: Response): auto = Peer(response)
template getPeer(response: ResponseWithId): auto = response.peer
proc supports*(peer: Peer, Protocol: type): bool {.inline.} =
## Checks whether a Peer supports a particular protocol
peer.protocolOffset(Protocol) != -1
@ -490,6 +490,8 @@ proc waitSingleMsg(peer: Peer, MsgType: type): Future[MsgType] {.async.} =
warn "Dropped RLPX message",
msg = peer.dispatcher.messages[nextMsgId].name
include p2p_backends_helpers
proc nextMsg*(peer: Peer, MsgType: type): Future[MsgType] =
## This procs awaits a specific RLPx message.
## Any messages received while waiting will be dispatched to their
@ -525,77 +527,6 @@ proc dispatchMessages*(peer: Peer) {.async.} =
(msgInfo.nextMsgResolver)(msgData, peer.awaitedMessages[msgId])
peer.awaitedMessages[msgId] = nil
iterator typedParams(n: NimNode, skip = 0): (NimNode, NimNode) =
for i in (1 + skip) ..< n.params.len:
let paramNodes = n.params[i]
let paramType = paramNodes[^2]
for j in 0 ..< paramNodes.len - 2:
yield (paramNodes[j], paramType)
proc chooseFieldType(n: NimNode): NimNode =
## Examines the parameter types used in the message signature
## and selects the corresponding field type for use in the
## message object type (i.e. `p2p.hello`).
##
## For now, only openarray types are remapped to sequences.
result = n
if n.kind == nnkBracketExpr and eqIdent(n[0], "openarray"):
result = n.copyNimTree
result[0] = ident("seq")
proc getState(peer: Peer, proto: ProtocolInfo): RootRef =
peer.protocolStates[proto.index]
template state*(peer: Peer, Protocol: type): untyped =
## Returns the state object of a particular protocol for a
## particular connection.
mixin State
bind getState
cast[Protocol.State](getState(peer, Protocol.protocolInfo))
proc getNetworkState(node: EthereumNode, proto: ProtocolInfo): RootRef =
node.protocolStates[proto.index]
template protocolState*(node: EthereumNode, Protocol: type): untyped =
mixin NetworkState
bind getNetworkState
cast[Protocol.NetworkState](getNetworkState(node, Protocol.protocolInfo))
template networkState*(connection: Peer, Protocol: type): untyped =
## Returns the network state object of a particular protocol for a
## particular connection.
protocolState(connection.network, Protocol)
proc initProtocolState*[T](state: T, x: Peer|EthereumNode) {.gcsafe.} = discard
proc createPeerState[ProtocolState](peer: Peer): RootRef =
var res = new ProtocolState
mixin initProtocolState
initProtocolState(res, peer)
return cast[RootRef](res)
proc createNetworkState[NetworkState](network: EthereumNode): RootRef {.gcsafe.} =
var res = new NetworkState
mixin initProtocolState
initProtocolState(res, network)
return cast[RootRef](res)
proc popTimeoutParam(n: NimNode): NimNode =
var lastParam = n.params[^1]
if eqIdent(lastParam[0], "timeout"):
if lastParam[2].kind == nnkEmpty:
macros.error "You must specify a default value for the `timeout` parameter", lastParam
result = lastParam
n.params.del(n.params.len - 1)
proc verifyStateType(t: NimNode): NimNode =
result = t[1]
if result.kind == nnkSym and $result == "nil":
return nil
if result.kind != nnkBracketExpr or $result[0] != "ref":
macros.error($result & " must be a ref type")
macro p2pProtocolImpl(name: static[string],
version: static[uint],
body: untyped,
@ -632,6 +563,7 @@ macro p2pProtocolImpl(name: static[string],
protoNameIdent = ident(protoName)
resultIdent = ident "result"
perProtocolMsgId = ident"perProtocolMsgId"
response = ident"response"
currentProtocolSym = ident"CurrentProtocol"
protocol = ident(protoName & "Protocol")
isSubprotocol = version > 0'u
@ -674,7 +606,10 @@ macro p2pProtocolImpl(name: static[string],
template applyDecorator(p: NimNode, decorator: NimNode) =
if decorator.kind != nnkNilLit: p.addPragma decorator
proc augmentUserHandler(userHandlerProc: NimNode, msgId = -1, msgKind = rlpxNotification) =
proc augmentUserHandler(userHandlerProc: NimNode,
msgId = -1,
msgKind = rlpxNotification,
extraDefinitions: NimNode = nil) =
## Turns a regular proc definition into an async proc and adds
## the helpers for accessing the peer and network protocol states.
case msgKind
@ -696,6 +631,9 @@ macro p2pProtocolImpl(name: static[string],
userHandlerDefinitions.add quote do:
type `currentProtocolSym` = `protoNameIdent`
if extraDefinitions != nil:
userHandlerDefinitions.add extraDefinitions
if msgId >= 0:
userHandlerDefinitions.add quote do:
const `perProtocolMsgId` = `msgId`
@ -729,7 +667,7 @@ macro p2pProtocolImpl(name: static[string],
responseMsgId = -1,
responseRecord: NimNode = nil): NimNode =
if n[0].kind == nnkPostfix:
macros.error("rlpxProcotol procs are public by default. " &
macros.error("p2pProcotol procs are public by default. " &
"Please remove the postfix `*`.", n)
let
@ -742,6 +680,7 @@ macro p2pProtocolImpl(name: static[string],
# variables used in the sending procs
msgRecipient = ident"msgRecipient"
sendTo = ident"sendTo"
reqTimeout: NimNode
rlpWriter = ident"writer"
appendParams = newNimNode(nnkStmtList)
@ -819,14 +758,25 @@ macro p2pProtocolImpl(name: static[string],
addr(`receivedMsg`),
`reqIdVal`)
if hasReqIds:
paramsToWrite.add reqId
paramsToWrite.add newDotExpr(sendTo, ident"id")
if n.body.kind != nnkEmpty:
# implement the receiving thunk proc that deserialzed the
# message parameters and calls the user proc:
userHandlerProc = n.copyNimTree
userHandlerProc.name = genSym(nskProc, msgName)
augmentUserHandler userHandlerProc, msgId, msgKind
var extraDefs: NimNode
if msgKind == rlpxRequest:
let peer = userHandlerProc.params[1][0]
if hasReqIds:
extraDefs = quote do:
let `response` = ResponseWithId[`responseRecord`](peer: `peer`, id: `reqId`)
else:
extraDefs = quote do:
let `response` = Response[`responseRecord`](`peer`)
augmentUserHandler userHandlerProc, msgId, msgKind, extraDefs
# This is the call to the user supplied handled. Here we add only the
# initial peer param, while the rest of the params will be added later.
@ -909,8 +859,11 @@ macro p2pProtocolImpl(name: static[string],
template msgProtocol*(T: type `msgRecord`): type = `protoNameIdent`
var msgSendProc = n
let msgSendProcName = n.name
outSendProcs.add msgSendProc
# TODO: check that the first param has the correct type
msgSendProc.params[1][0] = msgRecipient
msgSendProc.params[1][0] = sendTo
msgSendProc.addPragma ident"gcsafe"
# Add a timeout parameter for all request procs
@ -918,8 +871,20 @@ macro p2pProtocolImpl(name: static[string],
of rlpxRequest:
msgSendProc.params.add reqTimeout
of rlpxResponse:
if useRequestIds:
msgSendProc.params.insert 2, newIdentDefs(reqId, ident"int")
# A response proc must be called with a response object that originates
# from a certain request. Here we change the Peer parameter at position
# 1 to the correct strongly-typed ResponseType. The incoming procs still
# gets the normal Peer paramter.
let
ResponseTypeHead = if useRequestIds: bindSym"ResponseWithId"
else: bindSym"Response"
ResponseType = newTree(nnkBracketExpr, ResponseTypeHead, msgRecord)
msgSendProc.params[1][1] = ResponseType
outSendProcs.add quote do:
template send*(r: `ResponseType`, args: varargs[untyped]): auto =
`msgSendProcName`(r, args)
else: discard
# We change the return type of the sending proc to a Future.
@ -946,7 +911,7 @@ macro p2pProtocolImpl(name: static[string],
# `sendMsg` call.
quote: return `sendCall`
let `perPeerMsgIdValue` = if isSubprotocol:
let perPeerMsgIdValue = if isSubprotocol:
newCall(perPeerMsgIdImpl, msgRecipient, protocol, newLit(msgId))
else:
newLit(msgId)
@ -975,6 +940,7 @@ macro p2pProtocolImpl(name: static[string],
# let paramCountNode = newLit(paramCount)
msgSendProc.body = quote do:
let `msgRecipient` = getPeer(`sendTo`)
`initWriter`
`appendParams`
`finalizeRequest`
@ -983,8 +949,6 @@ macro p2pProtocolImpl(name: static[string],
if msgKind == rlpxRequest:
msgSendProc.applyDecorator outgoingRequestDecorator
outSendProcs.add msgSendProc
outProcRegistrations.add(
newCall(bindSym("registerMsg"),
protocol,
@ -1052,7 +1016,7 @@ macro p2pProtocolImpl(name: static[string],
elif eqIdent(n[0], "onPeerDisconnected"):
disconnectHandler = liftEventHandler(n[1], "PeerDisconnect")
else:
macros.error(repr(n) & " is not a recognized call in RLPx protocol definitions", n)
macros.error(repr(n) & " is not a recognized call in P2P protocol definitions", n)
of nnkProcDef:
discard addMsgHandler(nextId, n)
inc nextId
@ -1061,7 +1025,7 @@ macro p2pProtocolImpl(name: static[string],
discard
else:
macros.error("illegal syntax in a RLPx protocol definition", n)
macros.error("illegal syntax in a P2P protocol definition", n)
let peerInit = if peerState == nil: newNilLit()
else: newTree(nnkBracketExpr, createPeerState, peerState)

View File

@ -80,7 +80,7 @@ p2pProtocol eth(version = protocolVersion,
await peer.disconnect(BreachOfProtocol)
return
await peer.blockHeaders(peer.network.chain.getBlockHeaders(request))
await response.send(peer.network.chain.getBlockHeaders(request))
proc blockHeaders(p: Peer, headers: openarray[BlockHeader])
@ -90,7 +90,7 @@ p2pProtocol eth(version = protocolVersion,
await peer.disconnect(BreachOfProtocol)
return
await peer.blockBodies(peer.network.chain.getBlockBodies(hashes))
await response.send(peer.network.chain.getBlockBodies(hashes))
proc blockBodies(peer: Peer, blocks: openarray[BlockBody])
@ -101,13 +101,13 @@ p2pProtocol eth(version = protocolVersion,
requestResponse:
proc getNodeData(peer: Peer, hashes: openarray[KeccakHash]) =
await peer.nodeData(peer.network.chain.getStorageNodes(hashes))
await response.send(peer.network.chain.getStorageNodes(hashes))
proc nodeData(peer: Peer, data: openarray[Blob])
requestResponse:
proc getReceipts(peer: Peer, hashes: openarray[KeccakHash]) =
await peer.receipts(peer.network.chain.getReceipts(hashes))
await response.send(peer.network.chain.getReceipts(hashes))
proc receipts(peer: Peer, receipts: openarray[Receipt])

View File

@ -287,7 +287,7 @@ p2pProtocol les(version = lesVersion,
costQuantity(req.maxResults.int, max = maxHeadersFetch).} =
let headers = peer.network.chain.getBlockHeaders(req)
await peer.blockHeaders(reqId, updateBV(), headers)
await response.send(updateBV(), headers)
proc blockHeaders(
peer: Peer,
@ -304,7 +304,7 @@ p2pProtocol les(version = lesVersion,
costQuantity(blocks.len, max = maxBodiesFetch), gcsafe.} =
let blocks = peer.network.chain.getBlockBodies(blocks)
await peer.blockBodies(reqId, updateBV(), blocks)
await response.send(updateBV(), blocks)
proc blockBodies(
peer: Peer,
@ -318,7 +318,7 @@ p2pProtocol les(version = lesVersion,
{.costQuantity(hashes.len, max = maxReceiptsFetch).} =
let receipts = peer.network.chain.getReceipts(hashes)
await peer.receipts(reqId, updateBV(), receipts)
await response.send(updateBV(), receipts)
proc receipts(
peer: Peer,
@ -332,7 +332,7 @@ p2pProtocol les(version = lesVersion,
costQuantity(proofs.len, max = maxProofsFetch).} =
let proofs = peer.network.chain.getProofs(proofs)
await peer.proofs(reqId, updateBV(), proofs)
await response.send(updateBV(), proofs)
proc proofs(
peer: Peer,
@ -346,7 +346,7 @@ p2pProtocol les(version = lesVersion,
costQuantity(reqs.len, max = maxCodeFetch).} =
let results = peer.network.chain.getContractCodes(reqs)
await peer.contractCodes(reqId, updateBV(), results)
await response.send(updateBV(), results)
proc contractCodes(
peer: Peer,
@ -362,7 +362,7 @@ p2pProtocol les(version = lesVersion,
costQuantity(reqs.len, max = maxHeaderProofsFetch).} =
let proofs = peer.network.chain.getHeaderProofs(reqs)
await peer.headerProofs(reqId, updateBV(), proofs)
await response.send(updateBV(), proofs)
proc headerProofs(
peer: Peer,
@ -377,7 +377,7 @@ p2pProtocol les(version = lesVersion,
var nodes, auxData: seq[Blob]
peer.network.chain.getHelperTrieProofs(reqs, nodes, auxData)
await peer.helperTrieProofs(reqId, updateBV(), nodes, auxData)
await response.send(updateBV(), nodes, auxData)
proc helperTrieProofs(
peer: Peer,
@ -409,7 +409,7 @@ p2pProtocol les(version = lesVersion,
results.add s
await peer.txStatus(reqId, updateBV(), results)
await response.send(updateBV(), results)
proc getTxStatus(
peer: Peer,
@ -421,7 +421,7 @@ p2pProtocol les(version = lesVersion,
var results: seq[TransactionStatusMsg]
for t in transactions:
results.add chain.getTransactionStatus(t.rlpHash)
await peer.txStatus(reqId, updateBV(), results)
await response.send(updateBV(), results)
proc txStatus(
peer: Peer,

View File

@ -42,7 +42,7 @@ p2pProtocol abc(version = 1,
requestResponse:
proc abcReq(p: Peer, n: int) =
echo "got req ", n
await p.abcRes(reqId, &"response to #{n}")
await response.send(&"response to #{n}")
proc abcRes(p: Peer, data: string) =
echo "got response ", data
@ -89,6 +89,12 @@ template asyncTest(name, body: untyped) =
proc scenario {.async.} = body
waitFor scenario()
template sendResponseWithId(peer: Peer, proto, msg: untyped, reqId: int, data: varargs[untyped]): auto =
msg(ResponseWithId[proto.msg](peer: peer, id: reqId), data)
template sendResponse(peer: Peer, proto, msg: untyped, data: varargs[untyped]): auto =
msg(Response[proto.msg](peer), data)
asyncTest "network with 3 peers using custom protocols":
const useCompression = defined(useSnappy)
let localKeys = newKeyPair()
@ -101,7 +107,7 @@ asyncTest "network with 3 peers using custom protocols":
m.expect(abc.abcReq) do (peer: Peer, data: Rlp):
let reqId = data.readReqId()
await peer.abcRes(reqId, "mock response")
await sendResponseWithId(peer, abc, abcRes, reqId, "mock response")
await sleepAsync(100)
let r = await peer.abcReq(1)
assert r.get.data == "response to #1"
@ -116,7 +122,7 @@ asyncTest "network with 3 peers using custom protocols":
m.expect(xyz.xyzReq) do (peer: Peer):
echo "got xyz req"
await peer.xyzRes("mock peer data")
await sendResponse(peer, xyz, xyzRes, "mock peer data")
when useCompression:
m.useCompression = useCompression