nim-eth-p2p/eth_p2p/rlpx.nim
Zahary Karadjov eca93509b4 Support for creating JSON dumps of all P2P network traffic
Enable by compiling with -d:p2pdump. A chronicles log file named
p2p_messages.json will be created in the working directory. This
file will be consumed by the upcoming Chronicles Tail GUI (more
details will be provided on the wiki of this repo).

Other changes:

* Removes the use of package_visible_types (only partially so far)
* Simplifies the new Snappy code a little bit
2018-11-10 02:18:00 +02:00

1383 lines
49 KiB
Nim

import
macros, tables, algorithm, deques, hashes, options, typetraits,
chronicles, nimcrypto, asyncdispatch2, rlp, eth_common, eth_keys,
private/types, kademlia, auth, rlpxcrypt, enode, p2p_tracing
when useSnappy:
import snappy
const devp2pSnappyVersion* = 5
const
tracingEnabled = defined(p2pdump)
logScope:
topics = "rlpx"
const
devp2pVersion* = 4
defaultReqTimeout = 10000
maxMsgSize = 1024 * 1024
when tracingEnabled:
import
eth_common/eth_types_json_serialization
export
# XXX: This is a work-around for a Nim issue.
# See a more detailed comment in p2p_tracing.nim
init, writeValue, getOutput
var
gProtocols: seq[ProtocolInfo]
gDispatchers = initSet[Dispatcher]()
devp2p: ProtocolInfo
# The variables above are immutable RTTI information. We need to tell
# Nim to not consider them GcSafe violations:
template rlpxProtocols*: auto = {.gcsafe.}: gProtocols
template devp2pProtocolInfo: auto = {.gcsafe.}: devp2p
proc newFuture[T](location: var Future[T]) =
location = newFuture[T]()
proc `$`*(p: Peer): string {.inline.} =
$p.remote
proc disconnect*(peer: Peer, reason: DisconnectionReason, notifyOtherPeer = true) {.async.}
template raisePeerDisconnected(msg: string, r: DisconnectionReason) =
var e = newException(PeerDisconnected, msg)
e.reason = r
raise e
proc disconnectAndRaise(peer: Peer,
reason: DisconnectionReason,
msg: string) {.async.} =
let r = reason
await peer.disconnect(r)
raisePeerDisconnected(msg, r)
# Dispatcher
#
proc hash(d: Dispatcher): int =
hash(d.protocolOffsets)
proc `==`(lhs, rhs: Dispatcher): bool =
lhs.activeProtocols == rhs.activeProtocols
proc describeProtocols(d: Dispatcher): string =
result = ""
for protocol in d.activeProtocols:
if result.len != 0: result.add(',')
for c in protocol.name: result.add(c)
proc numProtocols(d: Dispatcher): int =
d.activeProtocols.len
proc getDispatcher(node: EthereumNode,
otherPeerCapabilities: openarray[Capability]): Dispatcher =
# TODO: sub-optimal solution until progress is made here:
# https://github.com/nim-lang/Nim/issues/7457
# We should be able to find an existing dispatcher without allocating a new one
new(result)
newSeq(result.protocolOffsets, rlpxProtocols.len)
result.protocolOffsets.fill -1
var nextUserMsgId = 0x10
for localProtocol in node.rlpxProtocols:
let idx = localProtocol.index
block findMatchingProtocol:
for remoteCapability in otherPeerCapabilities:
if localProtocol.name == remoteCapability.name and
localProtocol.version == remoteCapability.version:
result.protocolOffsets[idx] = nextUserMsgId
nextUserMsgId += localProtocol.messages.len
break findMatchingProtocol
if result in gDispatchers:
return gDispatchers[result]
else:
template copyTo(src, dest; index: int) =
for i in 0 ..< src.len:
dest[index + i] = addr src[i]
result.messages = newSeq[ptr MessageInfo](nextUserMsgId)
devp2pProtocolInfo.messages.copyTo(result.messages, 0)
for localProtocol in node.rlpxProtocols:
let idx = localProtocol.index
if result.protocolOffsets[idx] != -1:
result.activeProtocols.add localProtocol
localProtocol.messages.copyTo(result.messages,
result.protocolOffsets[idx])
gDispatchers.incl result
proc getMsgName*(peer: Peer, msgId: int): string =
if not peer.dispatcher.isNil and
msgId < peer.dispatcher.messages.len:
return peer.dispatcher.messages[msgId].name
else:
return case msgId
of 0: "hello"
of 1: "disconnect"
of 2: "ping"
of 3: "pong"
else: $msgId
proc getMsgMetadata*(peer: Peer, msgId: int): (ProtocolInfo, ptr MessageInfo) =
doAssert msgId >= 0
if msgId <= devp2p.messages[^1].id:
return (devp2p, addr devp2p.messages[msgId])
if msgId < peer.dispatcher.messages.len:
for i in 0 ..< rlpxProtocols.len:
let offset = peer.dispatcher.protocolOffsets[i]
if offset != -1 and
offset + rlpxProtocols[i].messages[^1].id >= msgId:
return (rlpxProtocols[i], peer.dispatcher.messages[msgId])
# Protocol info objects
#
proc newProtocol(name: string, version: int,
peerInit: PeerStateInitializer,
networkInit: NetworkStateInitializer): ProtocolInfo =
new result
result.name = name
result.version = version
result.messages = @[]
result.peerStateInitializer = peerInit
result.networkStateInitializer = networkInit
proc setEventHandlers(p: ProtocolInfo,
handshake: HandshakeStep,
disconnectHandler: DisconnectionHandler) =
p.handshake = handshake
p.disconnectHandler = disconnectHandler
func asCapability*(p: ProtocolInfo): Capability =
result.name = p.name
result.version = p.version
func nameStr*(p: ProtocolInfo): string =
result = newStringOfCap(3)
for c in p.name: result.add(c)
# XXX: this used to be inline, but inline procs
# cannot be passed to closure params
proc cmp*(lhs, rhs: ProtocolInfo): int =
for i in 0..2:
if lhs.name[i] != rhs.name[i]:
return int16(lhs.name[i]) - int16(rhs.name[i])
return 0
proc messagePrinter[MsgType](msg: pointer): string =
result = ""
# TODO: uncommenting the line below increases the compile-time
# tremendously (for reasons not yet known)
# result = $(cast[ptr MsgType](msg)[])
proc nextMsgResolver[MsgType](msgData: Rlp, future: FutureBase) =
var reader = msgData
Future[MsgType](future).complete reader.readRecordType(MsgType, MsgType.rlpFieldsCount > 1)
proc requestResolver[MsgType](msg: pointer, future: FutureBase) =
var f = Future[Option[MsgType]](future)
if not f.finished:
if msg != nil:
f.complete some(cast[ptr MsgType](msg)[])
else:
f.complete none(MsgType)
else:
# This future was already resolved, but let's do some sanity checks
# here. The only reasonable explanation is that the request should
# have timed out.
if msg != nil:
if f.read.isSome:
doAssert false, "trying to resolve a request twice"
else:
doAssert false, "trying to resolve a timed out request with a value"
else:
if not f.read.isSome:
doAssert false, "a request timed out twice"
proc registerMsg(protocol: var ProtocolInfo,
id: int, name: string,
thunk: MessageHandler,
printer: MessageContentPrinter,
requestResolver: RequestResolver,
nextMsgResolver: NextMsgResolver) =
if protocol.messages.len <= id:
protocol.messages.setLen(id + 1)
protocol.messages[id] = MessageInfo(id: id,
name: name,
thunk: thunk,
printer: printer,
requestResolver: requestResolver,
nextMsgResolver: nextMsgResolver)
proc registerProtocol(protocol: ProtocolInfo) =
# TODO: This can be done at compile-time in the future
if protocol.version > 0:
let pos = lowerBound(gProtocols, protocol)
gProtocols.insert(protocol, pos)
for i in 0 ..< gProtocols.len:
gProtocols[i].index = i
else:
devp2p = protocol
# Message composition and encryption
#
template protocolOffset(peer: Peer, Protocol: type): int =
peer.dispatcher.protocolOffsets[Protocol.protocolInfo.index]
proc perPeerMsgIdImpl(peer: Peer, proto: ProtocolInfo, msgId: int): int {.inline.} =
result = msgId
if not peer.dispatcher.isNil:
result += peer.dispatcher.protocolOffsets[proto.index]
proc supports*(peer: Peer, Protocol: type): bool {.inline.} =
## Checks whether a Peer supports a particular protocol
peer.protocolOffset(Protocol) != -1
template perPeerMsgId(peer: Peer, MsgType: type): int =
perPeerMsgIdImpl(peer, MsgType.msgProtocol.protocolInfo, MsgType.msgId)
proc writeMsgId(p: ProtocolInfo, msgId: int, peer: Peer,
rlpOut: var RlpWriter) =
let baseMsgId = peer.dispatcher.protocolOffsets[p.index]
doAssert baseMsgId != -1
rlpOut.append(baseMsgId + msgId)
proc invokeThunk*(peer: Peer, msgId: int, msgData: var Rlp): Future[void] =
template invalidIdError: untyped =
raise newException(ValueError,
"RLPx message with an invalid id " & $msgId &
" on a connection supporting " & peer.dispatcher.describeProtocols)
if msgId >= peer.dispatcher.messages.len: invalidIdError()
let thunk = peer.dispatcher.messages[msgId].thunk
if thunk == nil: invalidIdError()
return thunk(peer, msgId, msgData)
proc linkSendFailureToReqFuture[S, R](sendFut: Future[S], resFut: Future[R]) =
sendFut.addCallback() do(arg: pointer):
if not sendFut.error.isNil:
resFut.fail(sendFut.error)
template compressMsg(peer: Peer, data: Bytes): Bytes =
when useSnappy:
if peer.snappyEnabled:
snappy.compress(data)
else: data
else:
data
proc sendMsg*(peer: Peer, data: Bytes) {.async.} =
var cipherText = encryptMsg(peer.compressMsg(data), peer.secretsState)
try:
discard await peer.transport.write(cipherText)
except:
await peer.disconnect(TcpError)
raise
proc send*[Msg](peer: Peer, msg: Msg): Future[void] =
logSentMsg(peer, msg)
var rlpWriter = initRlpWriter()
rlpWriter.append perPeerMsgId(peer, Msg)
rlpWriter.appendRecordType(msg, Msg.rlpFieldsCount > 1)
peer.sendMsg rlpWriter.finish
proc registerRequest*(peer: Peer,
timeout: int,
responseFuture: FutureBase,
responseMsgId: int): int =
inc peer.lastReqId
result = peer.lastReqId
let timeoutAt = fastEpochTime() + uint64(timeout)
let req = OutstandingRequest(id: result,
future: responseFuture,
timeoutAt: timeoutAt)
peer.outstandingRequests[responseMsgId].addLast req
assert(not peer.dispatcher.isNil)
let requestResolver = peer.dispatcher.messages[responseMsgId].requestResolver
proc timeoutExpired(udata: pointer) = requestResolver(nil, responseFuture)
addTimer(timeoutAt, timeoutExpired, nil)
proc resolveResponseFuture(peer: Peer, msgId: int, msg: pointer, reqId: int) =
logScope:
msg = peer.dispatcher.messages[msgId].name
msgContents = peer.dispatcher.messages[msgId].printer(msg)
receivedReqId = reqId
remotePeer = peer.remote
template resolve(future) =
(peer.dispatcher.messages[msgId].requestResolver)(msg, future)
template outstandingReqs: auto =
peer.outstandingRequests[msgId]
if reqId == -1:
# XXX: This is a response from an ETH-like protocol that doesn't feature
# request IDs. Handling the response is quite tricky here because this may
# be a late response to an already timed out request or a valid response
# from a more recent one.
#
# We can increase the robustness by recording enough features of the
# request so we can recognize the matching response, but this is not very
# easy to do because our peers are allowed to send partial responses.
#
# A more generally robust approach is to maintain a set of the wanted
# data items and then to periodically look for items that have been
# requested long time ago, but are still missing. New requests can be
# issues for such items potentially from another random peer.
var expiredRequests = 0
for req in outstandingReqs:
if not req.future.finished: break
inc expiredRequests
outstandingReqs.shrink(fromFirst = expiredRequests)
if outstandingReqs.len > 0:
let oldestReq = outstandingReqs.popFirst
resolve oldestReq.future
else:
debug "late or duplicate reply for a RLPx request"
else:
# TODO: This is not completely sound because we are still using a global
# `reqId` sequence (the problem is that we might get a response ID that
# matches a request ID for a different type of request). To make the code
# correct, we can use a separate sequence per response type, but we have
# to first verify that the other Ethereum clients are supporting this
# correctly (because then, we'll be reusing the same reqIds for different
# types of requests). Alternatively, we can assign a separate interval in
# the `reqId` space for each type of response.
if reqId > peer.lastReqId:
warn "RLPx response without a matching request"
return
var idx = 0
while idx < outstandingReqs.len:
template req: auto = outstandingReqs()[idx]
if req.future.finished:
assert req.timeoutAt < fastEpochTime()
# Here we'll remove the expired request by swapping
# it with the last one in the deque (if necessary):
if idx != outstandingReqs.len - 1:
req = outstandingReqs.popLast
else:
outstandingReqs.shrink(fromLast = 1)
# This was the last item, so we don't have any
# more work to do:
return
if req.id == reqId:
resolve req.future
# Here we'll remove the found request by swapping
# it with the last one in the deque (if necessary):
if idx != outstandingReqs.len - 1:
req = outstandingReqs.popLast
else:
outstandingReqs.shrink(fromLast = 1)
return
inc idx
debug "late or duplicate reply for a RLPx request"
proc recvMsg*(peer: Peer): Future[tuple[msgId: int, msgData: Rlp]] {.async.} =
## This procs awaits the next complete RLPx message in the TCP stream
var headerBytes: array[32, byte]
await peer.transport.readExactly(addr headerBytes[0], 32)
var msgSize: int
if decryptHeaderAndGetMsgSize(peer.secretsState,
headerBytes, msgSize) != RlpxStatus.Success:
await peer.disconnectAndRaise(BreachOfProtocol,
"Cannot decrypt RLPx frame header")
if msgSize > maxMsgSize:
await peer.disconnectAndRaise(BreachOfProtocol,
"RLPx message exceeds maximum size")
let remainingBytes = encryptedLength(msgSize) - 32
# TODO: Migrate this to a thread-local seq
# JACEK:
# or pass it in, allowing the caller to choose - they'll likely be in a
# better position to decide if buffer should be reused or not. this will
# also be useuful for chunked messages where part of the buffer may have
# been processed and needs filling in
var encryptedBytes = newSeq[byte](remainingBytes)
await peer.transport.readExactly(addr encryptedBytes[0], len(encryptedBytes))
let decryptedMaxLength = decryptedLength(msgSize)
var
decryptedBytes = newSeq[byte](decryptedMaxLength)
decryptedBytesCount = 0
if decryptBody(peer.secretsState, encryptedBytes, msgSize,
decryptedBytes, decryptedBytesCount) != RlpxStatus.Success:
await peer.disconnectAndRaise(BreachOfProtocol,
"Cannot decrypt RLPx frame body")
decryptedBytes.setLen(decryptedBytesCount)
when useSnappy:
if peer.snappyEnabled:
decryptedBytes = snappy.uncompress(decryptedBytes)
if decryptedBytes.len == 0:
await peer.disconnectAndRaise(BreachOfProtocol,
"Snappy uncompress encountered malformed data")
var rlp = rlpFromBytes(decryptedBytes.toRange)
try:
let msgid = rlp.read(int)
return (msgId, rlp)
except RlpError:
await peer.disconnectAndRaise(BreachOfProtocol,
"Cannot read RLPx message id")
proc checkedRlpRead(peer: Peer, r: var Rlp, MsgType: type): auto {.inline.} =
let tmp = r
when defined(release):
return r.read(MsgType)
else:
try:
return r.read(MsgType)
except:
# echo "Failed rlp.read:", tmp.inspect
error "Failed rlp.read",
peer = peer,
msg = MsgType.name,
exception = getCurrentExceptionMsg()
# dataHex = r.rawData.toSeq().toHex()
raise
proc waitSingleMsg(peer: Peer, MsgType: type): Future[MsgType] {.async.} =
let wantedId = peer.perPeerMsgId(MsgType)
while true:
var (nextMsgId, nextMsgData) = await peer.recvMsg()
if nextMsgId == wantedId:
try:
result = checkedRlpRead(peer, nextMsgData, MsgType)
logReceivedMsg(peer, result)
return
except RlpError:
await peer.disconnectAndRaise(BreachOfProtocol,
"Invalid RLPx message body")
elif nextMsgId == 1: # p2p.disconnect
let reason = DisconnectionReason nextMsgData.listElem(0).toInt(uint32)
await peer.disconnect(reason, notifyOtherPeer = false)
raisePeerDisconnected("Unexpected disconnect", reason)
else:
warn "Dropped RLPX message",
msg = peer.dispatcher.messages[nextMsgId].name
proc nextMsg*(peer: Peer, MsgType: type): Future[MsgType] =
## This procs awaits a specific RLPx message.
## Any messages received while waiting will be dispatched to their
## respective handlers. The designated message handler will also run
## to completion before the future returned by `nextMsg` is resolved.
let wantedId = peer.perPeerMsgId(MsgType)
let f = peer.awaitedMessages[wantedId]
if not f.isNil:
return Future[MsgType](f)
newFuture result
peer.awaitedMessages[wantedId] = result
proc dispatchMessages*(peer: Peer) {.async.} =
while true:
var (msgId, msgData) = await peer.recvMsg()
if msgId == 1: # p2p.disconnect
await peer.transport.closeWait()
let reason = msgData.listElem(0).toInt(uint32).DisconnectionReason
await peer.disconnect(reason, notifyOtherPeer = false)
break
try:
await peer.invokeThunk(msgId, msgData)
except RlpError:
error "endind dispatchMessages loop", peer, err = getCurrentExceptionMsg()
await peer.disconnect(BreachOfProtocol)
return
if peer.awaitedMessages[msgId] != nil:
let msgInfo = peer.dispatcher.messages[msgId]
(msgInfo.nextMsgResolver)(msgData, peer.awaitedMessages[msgId])
peer.awaitedMessages[msgId] = nil
iterator typedParams(n: NimNode, skip = 0): (NimNode, NimNode) =
for i in (1 + skip) ..< n.params.len:
let paramNodes = n.params[i]
let paramType = paramNodes[^2]
for j in 0 ..< paramNodes.len - 2:
yield (paramNodes[j], paramType)
proc chooseFieldType(n: NimNode): NimNode =
## Examines the parameter types used in the message signature
## and selects the corresponding field type for use in the
## message object type (i.e. `p2p.hello`).
##
## For now, only openarray types are remapped to sequences.
result = n
if n.kind == nnkBracketExpr and eqIdent(n[0], "openarray"):
result = n.copyNimTree
result[0] = ident("seq")
proc getState(peer: Peer, proto: ProtocolInfo): RootRef =
peer.protocolStates[proto.index]
template state*(peer: Peer, Protocol: type): untyped =
## Returns the state object of a particular protocol for a
## particular connection.
bind getState
cast[Protocol.State](getState(peer, Protocol.protocolInfo))
proc getNetworkState(node: EthereumNode, proto: ProtocolInfo): RootRef =
node.protocolStates[proto.index]
template protocolState*(node: EthereumNode, Protocol: type): untyped =
bind getNetworkState
cast[Protocol.NetworkState](getNetworkState(node, Protocol.protocolInfo))
template networkState*(connection: Peer, Protocol: type): untyped =
## Returns the network state object of a particular protocol for a
## particular connection.
protocolState(connection.network, Protocol)
proc initProtocolState*[T](state: T, x: Peer|EthereumNode) = discard
proc createPeerState[ProtocolState](peer: Peer): RootRef =
var res = new ProtocolState
mixin initProtocolState
initProtocolState(res, peer)
return cast[RootRef](res)
proc createNetworkState[NetworkState](network: EthereumNode): RootRef =
var res = new NetworkState
mixin initProtocolState
initProtocolState(res, network)
return cast[RootRef](res)
proc popTimeoutParam(n: NimNode): NimNode =
var lastParam = n.params[^1]
if eqIdent(lastParam[0], "timeout"):
if lastParam[2].kind == nnkEmpty:
macros.error "You must specify a default value for the `timeout` parameter", lastParam
result = lastParam
n.params.del(n.params.len - 1)
proc verifyStateType(t: NimNode): NimNode =
result = t[1]
if result.kind == nnkSym and $result == "nil":
return nil
if result.kind != nnkBracketExpr or $result[0] != "ref":
macros.error($result & " must be a ref type")
macro rlpxProtocolImpl(name: static[string],
version: static[uint],
body: untyped,
useRequestIds: static[bool] = true,
timeout: static[int] = defaultReqTimeout,
shortName: static[string] = "",
outgoingRequestDecorator: untyped = nil,
incomingRequestDecorator: untyped = nil,
incomingRequestThunkDecorator: untyped = nil,
incomingResponseDecorator: untyped = nil,
incomingResponseThunkDecorator: untyped = nil,
peerState = type(nil),
networkState = type(nil)): untyped =
## The macro used to defined RLPx sub-protocols. See README.
var
# XXX: deal with a Nim bug causing the macro params to be
# zero when they are captured by a closure:
outgoingRequestDecorator = outgoingRequestDecorator
incomingRequestDecorator = incomingRequestDecorator
incomingRequestThunkDecorator = incomingRequestThunkDecorator
incomingResponseDecorator = incomingResponseDecorator
incomingResponseThunkDecorator = incomingResponseThunkDecorator
useRequestIds = useRequestIds
version = version
defaultTimeout = timeout
nextId = 0
protoName = name
shortName = if shortName.len > 0: shortName else: protoName
outTypes = newNimNode(nnkStmtList)
outSendProcs = newNimNode(nnkStmtList)
outRecvProcs = newNimNode(nnkStmtList)
outProcRegistrations = newNimNode(nnkStmtList)
protoNameIdent = ident(protoName)
resultIdent = ident "result"
perProtocolMsgId = ident"perProtocolMsgId"
protocol = ident(protoName & "Protocol")
isSubprotocol = version > 0'u
peerState = verifyStateType peerState.getType
networkState = verifyStateType networkState.getType
handshake = newNilLit()
disconnectHandler = newNilLit()
Option = bindSym "Option"
# XXX: Binding the int type causes instantiation failure for some reason
# Int = bindSym "int"
Int = ident "int"
Peer = bindSym "Peer"
append = bindSym "append"
createNetworkState = bindSym "createNetworkState"
createPeerState = bindSym "createPeerState"
finish = bindSym "finish"
initRlpWriter = bindSym "initRlpWriter"
enterList = bindSym "enterList"
messagePrinter = bindSym "messagePrinter"
newProtocol = bindSym "newProtocol"
nextMsgResolver = bindSym "nextMsgResolver"
read = bindSym "read"
registerRequest = bindSym "registerRequest"
requestResolver = bindSym "requestResolver"
resolveResponseFuture = bindSym "resolveResponseFuture"
rlpFromBytes = bindSym "rlpFromBytes"
checkedRlpRead = bindSym "checkedRlpRead"
sendMsg = bindSym "sendMsg"
startList = bindSym "startList"
writeMsgId = bindSym "writeMsgId"
getState = bindSym "getState"
getNetworkState = bindSym "getNetworkState"
perPeerMsgId = bindSym "perPeerMsgId"
perPeerMsgIdImpl = bindSym "perPeerMsgIdImpl"
linkSendFailureToReqFuture = bindSym "linkSendFailureToReqFuture"
# By convention, all Ethereum protocol names must be abbreviated to 3 letters
assert shortName.len == 3
template applyDecorator(p: NimNode, decorator: NimNode) =
if decorator.kind != nnkNilLit: p.addPragma decorator
proc augmentUserHandler(userHandlerProc: NimNode, msgId = -1, msgKind = rlpxNotification) =
## Turns a regular proc definition into an async proc and adds
## the helpers for accessing the peer and network protocol states.
case msgKind
of rlpxRequest: userHandlerProc.applyDecorator incomingRequestDecorator
of rlpxResponse: userHandlerProc.applyDecorator incomingResponseDecorator
else: discard
userHandlerProc.addPragma ident"async"
# We allow the user handler to use `openarray` params, but we turn
# those into sequences to make the `async` pragma happy.
for i in 1 ..< userHandlerProc.params.len:
var param = userHandlerProc.params[i]
param[^2] = chooseFieldType(param[^2])
var userHandlerDefinitions = newStmtList()
if msgId >= 0:
userHandlerDefinitions.add quote do:
const `perProtocolMsgId` = `msgId`
# Define local accessors for the peer and the network protocol states
# inside each user message handler proc (e.g. peer.state.foo = bar)
if peerState != nil:
userHandlerDefinitions.add quote do:
template state(p: `Peer`): `peerState` =
cast[`peerState`](`getState`(p, `protocol`))
if networkState != nil:
userHandlerDefinitions.add quote do:
template networkState(p: `Peer`): `networkState` =
cast[`networkState`](`getNetworkState`(p.network, `protocol`))
userHandlerProc.body.insert 0, userHandlerDefinitions
proc liftEventHandler(doBlock: NimNode, handlerName: string): NimNode =
## Turns a "named" do block to a regular async proc
## (e.g. onPeerConnected do ...)
var fn = newTree(nnkProcDef)
doBlock.copyChildrenTo(fn)
result = genSym(nskProc, protoName & handlerName)
fn.name = result
augmentUserHandler fn
outRecvProcs.add fn
proc addMsgHandler(msgId: int, n: NimNode,
msgKind = rlpxNotification,
responseMsgId = -1,
responseRecord: NimNode = nil): NimNode =
if n[0].kind == nnkPostfix:
macros.error("rlpxProcotol procs are public by default. " &
"Please remove the postfix `*`.", n)
let
msgIdent = n.name
msgName = $n.name
hasReqIds = useRequestIds and msgKind in {rlpxRequest, rlpxResponse}
var
paramCount = 0
userPragmas = n.pragma
# variables used in the sending procs
msgRecipient = ident"msgRecipient"
reqTimeout: NimNode
rlpWriter = ident"writer"
appendParams = newNimNode(nnkStmtList)
paramsToWrite = newSeq[NimNode](0)
reqId = ident"reqId"
perPeerMsgIdVar = ident"perPeerMsgId"
# variables used in the receiving procs
msgSender = ident"msgSender"
receivedRlp = ident"rlp"
receivedMsg = ident"msg"
readParams = newNimNode(nnkStmtList)
readParamsPrelude = newNimNode(nnkStmtList)
callResolvedResponseFuture = newNimNode(nnkStmtList)
# nodes to store the user-supplied message handling proc if present
userHandlerProc: NimNode = nil
userHandlerCall: NimNode = nil
awaitUserHandler = newStmtList()
# a record type associated with the message
msgRecord = newIdentNode(msgName & "Obj")
msgRecordFields = newTree(nnkRecList)
msgRecordBody = newTree(nnkObjectTy,
newEmptyNode(),
newEmptyNode(),
msgRecordFields)
result = msgRecord
if hasReqIds:
# Messages using request Ids
readParamsPrelude.add quote do:
let `reqId` = `read`(`receivedRlp`, int)
case msgKind
of rlpxNotification: discard
of rlpxRequest:
# If the request proc has a default timeout specified, remove it from
# the signature for now so we can generate the `thunk` proc without it.
# The parameter will be added back later only for to the sender proc.
# When the timeout is not specified, we use a default one.
reqTimeout = popTimeoutParam(n)
if reqTimeout == nil:
reqTimeout = newTree(nnkIdentDefs,
ident"timeout",
Int, newLit(defaultTimeout))
let reqToResponseOffset = responseMsgId - msgId
let responseMsgId = quote do: `perPeerMsgIdVar` + `reqToResponseOffset`
# Each request is registered so we can resolve it when the response
# arrives. There are two types of protocols: LES-like protocols use
# explicit `reqId` sent over the wire, while the ETH wire protocol
# assumes there is one outstanding request at a time (if there are
# multiple requests we'll resolve them in FIFO order).
let registerRequestCall = newCall(registerRequest, msgRecipient,
reqTimeout[0],
resultIdent,
responseMsgId)
if hasReqIds:
appendParams.add quote do:
newFuture `resultIdent`
let `reqId` = `registerRequestCall`
paramsToWrite.add reqId
else:
appendParams.add quote do:
newFuture `resultIdent`
discard `registerRequestCall`
of rlpxResponse:
let reqIdVal = if hasReqIds: `reqId` else: newLit(-1)
callResolvedResponseFuture.add quote do:
`resolveResponseFuture`(`msgSender`,
`perPeerMsgId`(`msgSender`, `msgRecord`),
addr(`receivedMsg`),
`reqIdVal`)
if hasReqIds:
paramsToWrite.add reqId
if n.body.kind != nnkEmpty:
# implement the receiving thunk proc that deserialzed the
# message parameters and calls the user proc:
userHandlerProc = n.copyNimTree
userHandlerProc.name = genSym(nskProc, msgName)
augmentUserHandler userHandlerProc, msgId, msgKind
# This is the call to the user supplied handled. Here we add only the
# initial peer param, while the rest of the params will be added later.
userHandlerCall = newCall(userHandlerProc.name, msgSender)
if hasReqIds:
userHandlerProc.params.insert(2, newIdentDefs(reqId, ident"int"))
userHandlerCall.add reqId
# When there is a user handler, it must be awaited in the thunk proc.
# Above, by default `awaitUserHandler` is set to a no-op statement list.
awaitUserHandler = newCall("await", userHandlerCall)
outRecvProcs.add(userHandlerProc)
for param, paramType in n.typedParams(skip = 1):
inc paramCount
# This is a fragment of the sending proc that
# serializes each of the passed parameters:
paramsToWrite.add param
# Each message has a corresponding record type.
# Here, we create its fields one by one:
msgRecordFields.add newTree(nnkIdentDefs,
newTree(nnkPostfix, ident("*"), param), # The fields are public
chooseFieldType(paramType), # some types such as openarray
# are automatically remapped
newEmptyNode())
# The received RLP data is deserialized to a local variable of
# the message-specific type. This is done field by field here:
let msgNameLit = newLit(msgName)
readParams.add quote do:
`receivedMsg`.`param` = `checkedRlpRead`(`msgSender`, `receivedRlp`, `paramType`)
# If there is user message handler, we'll place a call to it by
# unpacking the fields of the received message:
if userHandlerCall != nil:
userHandlerCall.add newDotExpr(receivedMsg, param)
if paramCount > 1:
readParamsPrelude.add newCall(enterList, receivedRlp)
when tracingEnabled:
readParams.add newCall(bindSym"logReceivedMsg", msgSender, receivedMsg)
let thunkName = ident(msgName & "_thunk")
var thunkProc = quote do:
proc `thunkName`(`msgSender`: `Peer`, _: int, data: Rlp) =
var `receivedRlp` = data
var `receivedMsg` {.noinit.}: `msgRecord`
`readParamsPrelude`
`readParams`
`awaitUserHandler`
`callResolvedResponseFuture`
for p in userPragmas: thunkProc.addPragma p
case msgKind
of rlpxRequest: thunkProc.applyDecorator incomingRequestThunkDecorator
of rlpxResponse: thunkProc.applyDecorator incomingResponseThunkDecorator
else: discard
thunkProc.addPragma ident"async"
outRecvProcs.add thunkProc
outTypes.add quote do:
# This is a type featuring a single field for each message param:
type `msgRecord`* = `msgRecordBody`
# Add a helper template for accessing the message type:
# e.g. p2p.hello:
template `msgIdent`*(T: type `protoNameIdent`): type = `msgRecord`
# Add a helper template for obtaining the message Id for
# a particular message type:
template msgId*(T: type `msgRecord`): int = `msgId`
template msgProtocol*(T: type `msgRecord`): type = `protoNameIdent`
var msgSendProc = n
# TODO: check that the first param has the correct type
msgSendProc.params[1][0] = msgRecipient
# Add a timeout parameter for all request procs
case msgKind
of rlpxRequest:
msgSendProc.params.add reqTimeout
of rlpxResponse:
if useRequestIds:
msgSendProc.params.insert 2, newIdentDefs(reqId, ident"int")
else: discard
# We change the return type of the sending proc to a Future.
# If this is a request proc, the future will return the response record.
let rt = if msgKind != rlpxRequest: ident"void"
else: newTree(nnkBracketExpr, Option, responseRecord)
msgSendProc.params[0] = newTree(nnkBracketExpr, ident("Future"), rt)
let msgBytes = ident"msgBytes"
let finalizeRequest = quote do:
let `msgBytes` = `finish`(`rlpWriter`)
var sendCall = newCall(sendMsg, msgRecipient, msgBytes)
let senderEpilogue = if msgKind == rlpxRequest:
# In RLPx requests, the returned future was allocated here and passed
# to `registerRequest`. It's already assigned to the result variable
# of the proc, so we just wait for the sending operation to complete
# and we return in a normal way. (the waiting is done, so we can catch
# any possible errors).
quote: `linkSendFailureToReqFuture`(`sendCall`, `resultIdent`)
else:
# In normal RLPx messages, we are returning the future returned by the
# `sendMsg` call.
quote: return `sendCall`
let `perPeerMsgIdValue` = if isSubprotocol:
newCall(perPeerMsgIdImpl, msgRecipient, protocol, newLit(msgId))
else:
newLit(msgId)
if paramCount > 1:
# In case there are more than 1 parameter,
# the params must be wrapped in a list:
appendParams = newStmtList(
newCall(startList, rlpWriter, newLit(paramCount)),
appendParams)
for p in paramsToWrite:
appendParams.add newCall(append, rlpWriter, p)
# Make the send proc public
msgSendProc.name = newTree(nnkPostfix, ident("*"), msgSendProc.name)
let initWriter = quote do:
var `rlpWriter` = `initRlpWriter`()
const `perProtocolMsgId` = `msgId`
let `perPeerMsgIdVar` = `perPeerMsgIdValue`
`append`(`rlpWriter`, `perPeerMsgIdVar`)
when tracingEnabled:
appendParams.add logSentMsgFields(msgRecipient, protocol, msgId, paramsToWrite)
# let paramCountNode = newLit(paramCount)
msgSendProc.body = quote do:
`initWriter`
`appendParams`
`finalizeRequest`
`senderEpilogue`
if msgKind == rlpxRequest:
msgSendProc.applyDecorator outgoingRequestDecorator
outSendProcs.add msgSendProc
outProcRegistrations.add(
newCall(bindSym("registerMsg"),
protocol,
newIntLitNode(msgId),
newStrLitNode($n.name),
thunkName,
newTree(nnkBracketExpr, messagePrinter, msgRecord),
newTree(nnkBracketExpr, requestResolver, msgRecord),
newTree(nnkBracketExpr, nextMsgResolver, msgRecord)))
outTypes.add quote do:
# Create a type acting as a pseudo-object representing the protocol
# (e.g. p2p)
type `protoNameIdent`* = object
if peerState != nil:
outTypes.add quote do:
template State*(P: type `protoNameIdent`): type = `peerState`
if networkState != nil:
outTypes.add quote do:
template NetworkState*(P: type `protoNameIdent`): type = `networkState`
for n in body:
case n.kind
of {nnkCall, nnkCommand}:
if eqIdent(n[0], "nextID"):
# By default message IDs are assigned in increasing order
# `nextID` can be used to skip some of the numeric slots
if n.len == 2 and n[1].kind == nnkIntLit:
nextId = n[1].intVal.int
else:
macros.error("nextID expects a single int value", n)
elif eqIdent(n[0], "requestResponse"):
# `requestResponse` can be given a block of 2 or more procs.
# The last one is considered to be a response message, while
# all preceeding ones are requests triggering the response.
# The system makes sure to automatically insert a hidden `reqId`
# parameter used to discriminate the individual messages.
block processReqResp:
if n.len == 2 and n[1].kind == nnkStmtList:
var procs = newSeq[NimNode](0)
for def in n[1]:
if def.kind == nnkProcDef:
procs.add(def)
if procs.len > 1:
let responseMsgId = nextId + procs.len - 1
let responseRecord = addMsgHandler(responseMsgId,
procs[^1],
msgKind = rlpxResponse)
for i in 0 .. procs.len - 2:
discard addMsgHandler(nextId + i, procs[i],
msgKind = rlpxRequest,
responseMsgId = responseMsgId,
responseRecord = responseRecord)
inc nextId, procs.len
# we got all the way to here, so everything is fine.
# break the block so it doesn't reach the error call below
break processReqResp
macros.error("requestResponse expects a block with at least two proc definitions")
elif eqIdent(n[0], "onPeerConnected"):
handshake = liftEventHandler(n[1], "Handshake")
elif eqIdent(n[0], "onPeerDisconnected"):
disconnectHandler = liftEventHandler(n[1], "PeerDisconnect")
else:
macros.error(repr(n) & " is not a recognized call in RLPx protocol definitions", n)
of nnkProcDef:
discard addMsgHandler(nextId, n)
inc nextId
of nnkCommentStmt:
discard
else:
macros.error("illegal syntax in a RLPx protocol definition", n)
let peerInit = if peerState == nil: newNilLit()
else: newTree(nnkBracketExpr, createPeerState, peerState)
let netInit = if networkState == nil: newNilLit()
else: newTree(nnkBracketExpr, createNetworkState, networkState)
result = newNimNode(nnkStmtList)
result.add outTypes
result.add quote do:
# One global variable per protocol holds the protocol run-time data
var `protocol` = `newProtocol`(`shortName`, `version`, `peerInit`, `netInit`)
# The protocol run-time data is available as a pseudo-field
# (e.g. `p2p.protocolInfo`)
template protocolInfo*(P: type `protoNameIdent`): ProtocolInfo = `protocol`
result.add outSendProcs, outRecvProcs, outProcRegistrations
result.add quote do:
setEventHandlers(`protocol`, `handshake`, `disconnectHandler`)
result.add newCall(bindSym("registerProtocol"), protocol)
when isMainModule: echo repr(result)
when defined(debugRlpxProtocol) or defined(debugMacros):
echo repr(result)
macro rlpxProtocol*(protocolOptions: untyped, body: untyped): untyped =
let protoName = $(protocolOptions[0])
result = protocolOptions
result[0] = bindSym"rlpxProtocolImpl"
result.add(newTree(nnkExprEqExpr,
ident("name"),
newLit(protoName)))
result.add(newTree(nnkExprEqExpr,
ident("body"),
body))
rlpxProtocol p2p(version = 0):
proc hello(peer: Peer,
version: uint,
clientId: string,
capabilities: seq[Capability],
listenPort: uint,
nodeId: array[RawPublicKeySize, byte])
proc sendDisconnectMsg(peer: Peer, reason: DisconnectionReason)
proc ping(peer: Peer) =
discard peer.pong()
proc pong(peer: Peer) =
discard
proc removePeer(network: EthereumNode, peer: Peer) =
if network.peerPool != nil:
network.peerPool.connectedNodes.del(peer.remote)
for observer in network.peerPool.observers.values:
if not observer.onPeerDisconnected.isNil:
observer.onPeerDisconnected(peer)
proc callDisconnectHandlers(peer: Peer, reason: DisconnectionReason): Future[void] =
var futures = newSeqOfCap[Future[void]](rlpxProtocols.len)
for protocol in peer.dispatcher.activeProtocols:
if protocol.disconnectHandler != nil:
futures.add((protocol.disconnectHandler)(peer, reason))
return all(futures)
proc disconnect*(peer: Peer, reason: DisconnectionReason, notifyOtherPeer = true) {.async.} =
if peer.connectionState notin {Disconnecting, Disconnected}:
peer.connectionState = Disconnecting
try:
# TODO: investigate the failure here
if false and notifyOtherPeer and not peer.transport.closed:
await peer.sendDisconnectMsg(reason)
finally:
if not peer.dispatcher.isNil:
await callDisconnectHandlers(peer, reason)
logDisconnectedPeer peer
peer.connectionState = Disconnected
removePeer(peer.network, peer)
proc validatePubKeyInHello(msg: p2p.hello, pubKey: PublicKey): bool =
var pk: PublicKey
recoverPublicKey(msg.nodeId, pk) == EthKeysStatus.Success and pk == pubKey
proc performSubProtocolHandshakes(peer: Peer) {.async.} =
var subProtocolsHandshakes = newSeqOfCap[Future[void]](rlpxProtocols.len)
for protocol in peer.dispatcher.activeProtocols:
if protocol.handshake != nil:
subProtocolsHandshakes.add((protocol.handshake)(peer))
await all(subProtocolsHandshakes)
peer.connectionState = Connected
proc checkUselessPeer(peer: Peer) {.inline.} =
if peer.dispatcher.numProtocols == 0:
# XXX: Send disconnect + UselessPeer
raise newException(UselessPeerError, "Useless peer")
proc initPeerState*(peer: Peer, capabilities: openarray[Capability]) =
peer.dispatcher = getDispatcher(peer.network, capabilities)
checkUselessPeer(peer)
# The dispatcher has determined our message ID sequence.
# For each message ID, we allocate a potential slot for
# tracking responses to requests.
# (yes, some of the slots won't be used).
peer.outstandingRequests.newSeq(peer.dispatcher.messages.len)
for d in mitems(peer.outstandingRequests):
d = initDeque[OutstandingRequest]()
# Similarly, we need a bit of book-keeping data to keep track
# of the potentially concurrent calls to `nextMsg`.
peer.awaitedMessages.newSeq(peer.dispatcher.messages.len)
peer.lastReqId = 0
# Initialize all the active protocol states
newSeq(peer.protocolStates, rlpxProtocols.len)
for protocol in peer.dispatcher.activeProtocols:
let peerStateInit = protocol.peerStateInitializer
if peerStateInit != nil:
peer.protocolStates[protocol.index] = peerStateInit(peer)
proc postHelloSteps(peer: Peer, h: p2p.hello): Future[void] =
initPeerState(peer, h.capabilities)
var messageProcessingLoop = peer.dispatchMessages()
messageProcessingLoop.callback = proc(p: pointer) {.gcsafe.} =
if messageProcessingLoop.failed:
asyncCheck peer.disconnect(ClientQuitting)
return performSubProtocolHandshakes(peer)
template `^`(arr): auto =
# passes a stack array with a matching `arrLen`
# variable as an open array
arr.toOpenArray(0, `arr Len` - 1)
proc check(status: AuthStatus) =
if status != AuthStatus.Success:
raise newException(Exception, "Error: " & $status)
proc initSecretState(hs: var Handshake, authMsg, ackMsg: openarray[byte],
p: Peer) =
var secrets: ConnectionSecret
check hs.getSecrets(authMsg, ackMsg, secrets)
initSecretState(secrets, p.secretsState)
burnMem(secrets)
template checkSnappySupport(node: EthereumNode, handshake: Handshake, peer: Peer) =
when useSnappy:
peer.snappyEnabled = node.protocolVersion >= devp2pSnappyVersion.uint and
handshake.version >= devp2pSnappyVersion.uint
template getVersion(handshake: Handshake): uint =
when useSnappy:
handshake.version
else:
devp2pVersion
template baseProtocolVersion(node: EthereumNode): untyped =
when useSnappy:
node.protocolVersion
else:
devp2pVersion
template baseProtocolVersion(peer: Peer): uint =
when useSnappy:
if peer.snappyEnabled: devp2pSnappyVersion
else: devp2pVersion
else:
devp2pVersion
proc rlpxConnect*(node: EthereumNode, remote: Node): Future[Peer] {.async.} =
new result
result.network = node
result.remote = remote
let ta = initTAddress(remote.node.address.ip, remote.node.address.tcpPort)
var ok = false
try:
result.transport = await connect(ta)
var handshake = newHandshake({Initiator, EIP8}, int(node.baseProtocolVersion))
handshake.host = node.keys
var authMsg: array[AuthMessageMaxEIP8, byte]
var authMsgLen = 0
check authMessage(handshake, remote.node.pubkey, authMsg, authMsgLen)
var res = result.transport.write(addr authMsg[0], authMsgLen)
let initialSize = handshake.expectedLength
var ackMsg = newSeqOfCap[byte](1024)
ackMsg.setLen(initialSize)
await result.transport.readExactly(addr ackMsg[0], len(ackMsg))
var ret = handshake.decodeAckMessage(ackMsg)
if ret == AuthStatus.IncompleteError:
ackMsg.setLen(handshake.expectedLength)
await result.transport.readExactly(addr ackMsg[initialSize],
len(ackMsg) - initialSize)
ret = handshake.decodeAckMessage(ackMsg)
check ret
node.checkSnappySupport(handshake, result)
initSecretState(handshake, ^authMsg, ackMsg, result)
# if handshake.remoteHPubkey != remote.node.pubKey:
# raise newException(Exception, "Remote pubkey is wrong")
logConnectedPeer result
asyncCheck result.hello(handshake.getVersion(),
node.clientId,
node.rlpxCapabilities,
uint(node.address.tcpPort),
node.keys.pubkey.getRaw())
var response = await result.waitSingleMsg(p2p.hello)
if not validatePubKeyInHello(response, remote.node.pubKey):
warn "Remote nodeId is not its public key" # XXX: Do we care?
await postHelloSteps(result, response)
ok = true
except PeerDisconnected as e:
if e.reason != TooManyPeers:
debug "Unexpected disconnect during rlpxConnect", reason = e.reason
except TransportIncompleteError:
debug "Connection dropped in rlpxConnect", remote
except UselessPeerError:
debug "Useless peer ", peer = remote
except RlpTypeMismatch:
# Some peers report capabilities with names longer than 3 chars. We ignore
# those for now. Maybe we should allow this though.
debug "Rlp error in rlpxConnect"
except:
info "Exception in rlpxConnect", remote,
exc = getCurrentException().name,
err = getCurrentExceptionMsg()
if not ok:
if not isNil(result.transport):
result.transport.close()
result = nil
proc rlpxAccept*(node: EthereumNode,
transport: StreamTransport): Future[Peer] {.async.} =
new result
result.transport = transport
result.network = node
var handshake = newHandshake({Responder})
handshake.host = node.keys
try:
let initialSize = handshake.expectedLength
var authMsg = newSeqOfCap[byte](1024)
authMsg.setLen(initialSize)
await transport.readExactly(addr authMsg[0], len(authMsg))
var ret = handshake.decodeAuthMessage(authMsg)
if ret == AuthStatus.IncompleteError: # Eip8 auth message is likely
authMsg.setLen(handshake.expectedLength)
await transport.readExactly(addr authMsg[initialSize],
len(authMsg) - initialSize)
ret = handshake.decodeAuthMessage(authMsg)
check ret
node.checkSnappySupport(handshake, result)
handshake.version = uint8(result.baseProtocolVersion)
var ackMsg: array[AckMessageMaxEIP8, byte]
var ackMsgLen: int
check handshake.ackMessage(ackMsg, ackMsgLen)
var res = transport.write(addr ackMsg[0], ackMsgLen)
initSecretState(handshake, authMsg, ^ackMsg, result)
let listenPort = transport.localAddress().port
logAcceptedPeer result
await result.hello(result.baseProtocolVersion, node.clientId,
node.rlpxCapabilities, listenPort.uint,
node.keys.pubkey.getRaw())
var response = await result.waitSingleMsg(p2p.hello)
if not validatePubKeyInHello(response, handshake.remoteHPubkey):
warn "A Remote nodeId is not its public key" # XXX: Do we care?
let remote = transport.remoteAddress()
let address = Address(ip: remote.address, tcpPort: remote.port,
udpPort: remote.port)
result.remote = newNode(initEnode(handshake.remoteHPubkey, address))
await postHelloSteps(result, response)
except:
error "Exception in rlpxAccept",
err = getCurrentExceptionMsg(),
stackTrace = getCurrentException().getStackTrace()
transport.close()
result = nil
when isMainModule:
when false:
# The assignments below can be used to investigate if the RLPx procs
# are considered GcSafe. The short answer is that they aren't, because
# they dispatch into user code that might use the GC.
type
GcSafeDispatchMsg = proc (peer: Peer, msgId: int, msgData: var Rlp)
GcSafeRecvMsg = proc (peer: Peer):
Future[tuple[msgId: int, msgData: Rlp]] {.gcsafe.}
GcSafeAccept = proc (transport: StreamTransport, myKeys: KeyPair):
Future[Peer] {.gcsafe.}
var
dispatchMsgPtr = invokeThunk
recvMsgPtr: GcSafeRecvMsg = recvMsg
acceptPtr: GcSafeAccept = rlpxAccept